diff --git a/app/services/users/import_data.rb b/app/services/users/import_data.rb index 430f34e3..33613edb 100644 --- a/app/services/users/import_data.rb +++ b/app/services/users/import_data.rb @@ -23,7 +23,7 @@ require 'oj' # Files are restored to their original locations and properly attached to records. class Users::ImportData - STREAM_BATCH_SIZE = 1000 + STREAM_BATCH_SIZE = 5000 STREAMED_SECTIONS = %w[places visits points].freeze def initialize(user, archive_path) @@ -117,6 +117,7 @@ class Users::ImportData end initialize_stream_state + handler = ::JsonStreamHandler.new(self) parser = Oj::Parser.new(:saj, handler: handler) @@ -203,6 +204,7 @@ class Users::ImportData return unless place_data.is_a?(Hash) @places_batch << place_data + if @places_batch.size >= STREAM_BATCH_SIZE import_places_batch(@places_batch) @places_batch.clear diff --git a/app/services/users/import_data/places.rb b/app/services/users/import_data/places.rb index bacfd335..04f9179f 100644 --- a/app/services/users/import_data/places.rb +++ b/app/services/users/import_data/places.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true class Users::ImportData::Places - BATCH_SIZE = 1000 + BATCH_SIZE = 5000 def initialize(user, places_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger) @user = user @@ -42,11 +42,7 @@ class Users::ImportData::Places attr_reader :user, :places_data, :batch_size, :logger def enumerate(collection, &block) - if collection.is_a?(Array) - collection.each(&block) - else - collection.each(&block) - end + collection.each(&block) end def collection_description(collection) diff --git a/app/services/users/import_data/points.rb b/app/services/users/import_data/points.rb index c54ab1e1..2d27b8ee 100644 --- a/app/services/users/import_data/points.rb +++ b/app/services/users/import_data/points.rb @@ -3,7 +3,7 @@ require 'time' class Users::ImportData::Points - BATCH_SIZE = 1000 + BATCH_SIZE = 5000 def initialize(user, points_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger) @user = user @@ -68,11 +68,7 @@ class Users::ImportData::Points attr_reader :user, :points_data, :batch_size, :logger, :imports_lookup, :countries_lookup, :visits_lookup def enumerate(collection, &block) - if collection.is_a?(Array) - collection.each(&block) - else - collection.each(&block) - end + collection.each(&block) end def collection_description(collection) diff --git a/spec/services/users/import_data/places_streaming_spec.rb b/spec/services/users/import_data/places_streaming_spec.rb new file mode 100644 index 00000000..e476d443 --- /dev/null +++ b/spec/services/users/import_data/places_streaming_spec.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Users::ImportData::Places do + let(:user) { create(:user) } + let(:logger) { instance_double(Logger, info: nil, debug: nil, error: nil) } + let(:service) { described_class.new(user, nil, logger: logger) } + + describe '#add / #finalize' do + it 'creates places in batches and tracks total created' do + 2.times do |index| + service.add( + 'name' => "Place #{index}", + 'latitude' => 10.0 + index, + 'longitude' => 20.0 + index + ) + end + + expect { service.finalize }.to change(Place, :count).by(2) + expect { expect(service.finalize).to eq(2) }.not_to change(Place, :count) + end + + it 'flushes automatically when the buffer reaches the batch size' do + stub_const('Users::ImportData::Places::BATCH_SIZE', 2) + + logger_double = instance_double(Logger) + allow(logger_double).to receive(:info) + allow(logger_double).to receive(:debug) + allow(logger_double).to receive(:error) + + buffered_service = described_class.new(user, nil, batch_size: 2, logger: logger_double) + + buffered_service.add('name' => 'First', 'latitude' => 1, 'longitude' => 2) + expect(Place.count).to eq(0) + + buffered_service.add('name' => 'Second', 'latitude' => 3, 'longitude' => 4) + expect(Place.count).to eq(2) + + expect(buffered_service.finalize).to eq(2) + expect { buffered_service.finalize }.not_to change(Place, :count) + end + + it 'skips invalid records and logs debug messages' do + allow(logger).to receive(:debug) + + service.add('name' => 'Valid', 'latitude' => 1, 'longitude' => 2) + service.add('name' => 'Missing coords') + + expect(service.finalize).to eq(1) + expect(logger).to have_received(:debug).with(/Skipping place with missing required data/) + end + end +end