Update records storage importer to process all records

This commit is contained in:
Eugene Burmakin 2025-04-06 16:49:50 +02:00
parent 50144fddf2
commit 5ab8b17cdd
2 changed files with 151 additions and 60 deletions

View file

@ -5,6 +5,8 @@
class GoogleMaps::RecordsStorageImporter
BATCH_SIZE = 1000
MAX_RETRIES = 3
DOWNLOAD_TIMEOUT = 300 # 5 minutes timeout
def initialize(import, user_id)
@import = import
@ -23,55 +25,26 @@ class GoogleMaps::RecordsStorageImporter
attr_reader :import, :user
def process_file_in_batches
file = download_file
verify_file_integrity(file)
locations = parse_file(file)
process_locations_in_batches(locations) if locations.present?
end
def download_file
retries = 0
max_retries = 3
begin
file = Timeout.timeout(300) do # 5 minutes timeout
Timeout.timeout(DOWNLOAD_TIMEOUT) do
import.file.download
end
# Verify file size
expected_size = import.file.blob.byte_size
actual_size = file.size
if expected_size != actual_size
raise "Incomplete download: expected #{expected_size} bytes, got #{actual_size} bytes"
end
# Verify checksum
expected_checksum = import.file.blob.checksum
actual_checksum = Base64.strict_encode64(Digest::MD5.digest(file))
if expected_checksum != actual_checksum
raise "Checksum mismatch: expected #{expected_checksum}, got #{actual_checksum}"
end
parsed_file = Oj.load(file, mode: :compat)
return unless parsed_file.is_a?(Hash) && parsed_file['locations']
batch = []
index = 0
parsed_file['locations'].each do |location|
batch << location
next if batch.size < BATCH_SIZE
index += BATCH_SIZE
GoogleMaps::RecordsImporter.new(import, index).call(batch)
batch = []
end
rescue Timeout::Error => e
retries += 1
if retries <= max_retries
Rails.logger.warn("Download timeout, attempt #{retries} of #{max_retries}")
if retries <= MAX_RETRIES
Rails.logger.warn("Download timeout, attempt #{retries} of #{MAX_RETRIES}")
retry
else
Rails.logger.error("Download failed after #{max_retries} attempts")
Rails.logger.error("Download failed after #{MAX_RETRIES} attempts")
raise
end
rescue StandardError => e
@ -79,4 +52,51 @@ class GoogleMaps::RecordsStorageImporter
raise
end
end
def verify_file_integrity(file)
# Verify file size
expected_size = import.file.blob.byte_size
actual_size = file.size
if expected_size != actual_size
raise "Incomplete download: expected #{expected_size} bytes, got #{actual_size} bytes"
end
# Verify checksum
expected_checksum = import.file.blob.checksum
actual_checksum = Base64.strict_encode64(Digest::MD5.digest(file))
return unless expected_checksum != actual_checksum
raise "Checksum mismatch: expected #{expected_checksum}, got #{actual_checksum}"
end
def parse_file(file)
parsed_file = Oj.load(file, mode: :compat)
return nil unless parsed_file.is_a?(Hash) && parsed_file['locations']
parsed_file['locations']
end
def process_locations_in_batches(locations)
batch = []
index = 0
locations.each do |location|
batch << location
next unless batch.size >= BATCH_SIZE
process_batch(batch, index)
index += BATCH_SIZE
batch = []
end
# Process any remaining records that didn't make a full batch
process_batch(batch, index) unless batch.empty?
end
def process_batch(batch, index)
GoogleMaps::RecordsImporter.new(import, index).call(batch)
end
end

View file

@ -1,3 +1,5 @@
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe GoogleMaps::RecordsStorageImporter do
@ -21,15 +23,18 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
describe '#call' do
context 'with valid file' do
it 'processes files correctly' do
# Add a test spy to verify behavior
records_importer = class_spy(GoogleMaps::RecordsImporter)
stub_const('GoogleMaps::RecordsImporter', records_importer)
# Setup mock
mock_importer = instance_double(GoogleMaps::RecordsImporter)
allow(GoogleMaps::RecordsImporter).to receive(:new).and_return(mock_importer)
allow(mock_importer).to receive(:call)
# Run the method
subject.call
# Small files won't process any batches (< BATCH_SIZE)
expect(records_importer).not_to have_received(:new)
# The test fixture file has a small number of locations,
# and since we now process all records, we should expect `new` to be called
expect(GoogleMaps::RecordsImporter).to have_received(:new)
expect(mock_importer).to have_received(:call).once
end
context 'when file has more locations than batch size' do
@ -55,7 +60,7 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
)
end
it 'processes in batches of 1000' do
it 'processes in batches of 1000 and handles remaining records' do
# Add a test spy to verify behavior
mock_importer = instance_double(GoogleMaps::RecordsImporter)
allow(GoogleMaps::RecordsImporter).to receive(:new).and_return(mock_importer)
@ -64,12 +69,76 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
# Run the method
subject.call
# Verify that the importer was called with the first 1000 locations
expect(GoogleMaps::RecordsImporter).to have_received(:new).with(import, 1000)
# Verify batches were processed correctly
expect(GoogleMaps::RecordsImporter).to have_received(:new).with(import, 0).ordered
expect(GoogleMaps::RecordsImporter).to have_received(:new).with(import, 1000).ordered
expect(mock_importer).to have_received(:call).exactly(2).times
# Based on the implementation, remaining 1 item is NOT processed
# Because there's no code after the loop to handle remaining items
expect(GoogleMaps::RecordsImporter).to have_received(:new).exactly(1).times
# Verify batch sizes
first_call_args = nil
second_call_args = nil
allow(mock_importer).to receive(:call) do |args|
if first_call_args.nil?
first_call_args = args
else
second_call_args = args
end
end
expect(first_call_args&.size).to eq(1000) if first_call_args
expect(second_call_args&.size).to eq(1) if second_call_args
end
end
context 'with multiple batches' do
let(:multi_batch_data) do
locations = []
2345.times do |i|
locations << {
latitudeE7: 533_690_550,
longitudeE7: 836_950_010,
accuracy: 150,
source: 'UNKNOWN',
timestamp: "2012-12-15T14:21:#{i}.460Z"
}
end
{ locations: locations }.to_json
end
before do
import.file.attach(
io: StringIO.new(multi_batch_data),
filename: 'records.json',
content_type: 'application/json'
)
end
it 'processes all records across multiple batches' do
# Set up to capture batch sizes
batch_sizes = []
# Create mock
mock_importer = instance_double(GoogleMaps::RecordsImporter)
# Set up the call tracking BEFORE allowing :new to return the mock
allow(mock_importer).to receive(:call) do |batch|
batch_sizes << batch.size
end
allow(GoogleMaps::RecordsImporter).to receive(:new).and_return(mock_importer)
# Run the method
subject.call
# Should have 3 batches: 1000 + 1000 + 345
expect(GoogleMaps::RecordsImporter).to have_received(:new).with(import, 0).ordered
expect(GoogleMaps::RecordsImporter).to have_received(:new).with(import, 1000).ordered
expect(GoogleMaps::RecordsImporter).to have_received(:new).with(import, 2000).ordered
expect(mock_importer).to have_received(:call).exactly(3).times
# Verify the batch sizes
expect(batch_sizes).to eq([1000, 1000, 345])
end
end
end
@ -77,7 +146,7 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
context 'with download issues' do
it 'retries on timeout' do
call_count = 0
allow(import.file.blob).to receive(:download) do
allow(import.file).to receive(:download) do
call_count += 1
call_count < 3 ? raise(Timeout::Error) : file_content
end
@ -88,7 +157,7 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
end
it 'fails after max retries' do
allow(import.file.blob).to receive(:download).and_raise(Timeout::Error)
allow(import.file).to receive(:download).and_raise(Timeout::Error)
expect(Rails.logger).to receive(:warn).exactly(3).times
expect(Rails.logger).to receive(:error).with('Download failed after 3 attempts')
@ -99,16 +168,15 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
context 'with file integrity issues' do
it 'raises error when file size mismatches' do
allow(import.file.blob).to receive(:byte_size).and_return(9999)
allow_any_instance_of(StringIO).to receive(:size).and_return(9999)
allow(import.file.blob).to receive(:byte_size).and_return(1234)
expect(Rails.logger).to receive(:error)
expect { subject.call }.to raise_error(/Incomplete download/)
end
it 'raises error when checksum mismatches' do
allow(import.file.blob).to receive(:checksum).and_return('invalid_checksum')
expect(Rails.logger).to receive(:error)
expect { subject.call }.to raise_error(/Checksum mismatch/)
end
end
@ -123,9 +191,8 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
end
it 'logs and raises parse error' do
# Directly mock the standard error handling since the error happens during parsing
expect(Rails.logger).to receive(:error).with(/Download error: Empty input/)
expect { subject.call }.to raise_error(StandardError)
# The actual error raised is an EncodingError, not Oj::ParseError
expect { subject.call }.to raise_error(EncodingError)
end
end
@ -139,8 +206,12 @@ RSpec.describe GoogleMaps::RecordsStorageImporter do
end
it 'returns early when locations key is missing' do
expect(GoogleMaps::RecordsImporter).not_to receive(:new)
mock_importer = instance_double(GoogleMaps::RecordsImporter)
allow(GoogleMaps::RecordsImporter).to receive(:new).and_return(mock_importer)
allow(mock_importer).to receive(:call)
subject.call
expect(GoogleMaps::RecordsImporter).not_to have_received(:new)
end
end
end