mirror of
https://github.com/Freika/dawarich.git
synced 2026-01-11 01:31:39 -05:00
Update batch size for streaming imports to 5000
This commit is contained in:
parent
fa995a7f84
commit
87c5c34fb4
4 changed files with 61 additions and 13 deletions
|
|
@ -23,7 +23,7 @@ require 'oj'
|
||||||
# Files are restored to their original locations and properly attached to records.
|
# Files are restored to their original locations and properly attached to records.
|
||||||
|
|
||||||
class Users::ImportData
|
class Users::ImportData
|
||||||
STREAM_BATCH_SIZE = 1000
|
STREAM_BATCH_SIZE = 5000
|
||||||
STREAMED_SECTIONS = %w[places visits points].freeze
|
STREAMED_SECTIONS = %w[places visits points].freeze
|
||||||
|
|
||||||
def initialize(user, archive_path)
|
def initialize(user, archive_path)
|
||||||
|
|
@ -117,6 +117,7 @@ class Users::ImportData
|
||||||
end
|
end
|
||||||
|
|
||||||
initialize_stream_state
|
initialize_stream_state
|
||||||
|
|
||||||
handler = ::JsonStreamHandler.new(self)
|
handler = ::JsonStreamHandler.new(self)
|
||||||
parser = Oj::Parser.new(:saj, handler: handler)
|
parser = Oj::Parser.new(:saj, handler: handler)
|
||||||
|
|
||||||
|
|
@ -203,6 +204,7 @@ class Users::ImportData
|
||||||
return unless place_data.is_a?(Hash)
|
return unless place_data.is_a?(Hash)
|
||||||
|
|
||||||
@places_batch << place_data
|
@places_batch << place_data
|
||||||
|
|
||||||
if @places_batch.size >= STREAM_BATCH_SIZE
|
if @places_batch.size >= STREAM_BATCH_SIZE
|
||||||
import_places_batch(@places_batch)
|
import_places_batch(@places_batch)
|
||||||
@places_batch.clear
|
@places_batch.clear
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
class Users::ImportData::Places
|
class Users::ImportData::Places
|
||||||
BATCH_SIZE = 1000
|
BATCH_SIZE = 5000
|
||||||
|
|
||||||
def initialize(user, places_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger)
|
def initialize(user, places_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger)
|
||||||
@user = user
|
@user = user
|
||||||
|
|
@ -42,11 +42,7 @@ class Users::ImportData::Places
|
||||||
attr_reader :user, :places_data, :batch_size, :logger
|
attr_reader :user, :places_data, :batch_size, :logger
|
||||||
|
|
||||||
def enumerate(collection, &block)
|
def enumerate(collection, &block)
|
||||||
if collection.is_a?(Array)
|
|
||||||
collection.each(&block)
|
collection.each(&block)
|
||||||
else
|
|
||||||
collection.each(&block)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def collection_description(collection)
|
def collection_description(collection)
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@
|
||||||
require 'time'
|
require 'time'
|
||||||
|
|
||||||
class Users::ImportData::Points
|
class Users::ImportData::Points
|
||||||
BATCH_SIZE = 1000
|
BATCH_SIZE = 5000
|
||||||
|
|
||||||
def initialize(user, points_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger)
|
def initialize(user, points_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger)
|
||||||
@user = user
|
@user = user
|
||||||
|
|
@ -68,11 +68,7 @@ class Users::ImportData::Points
|
||||||
attr_reader :user, :points_data, :batch_size, :logger, :imports_lookup, :countries_lookup, :visits_lookup
|
attr_reader :user, :points_data, :batch_size, :logger, :imports_lookup, :countries_lookup, :visits_lookup
|
||||||
|
|
||||||
def enumerate(collection, &block)
|
def enumerate(collection, &block)
|
||||||
if collection.is_a?(Array)
|
|
||||||
collection.each(&block)
|
collection.each(&block)
|
||||||
else
|
|
||||||
collection.each(&block)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def collection_description(collection)
|
def collection_description(collection)
|
||||||
|
|
|
||||||
54
spec/services/users/import_data/places_streaming_spec.rb
Normal file
54
spec/services/users/import_data/places_streaming_spec.rb
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe Users::ImportData::Places do
|
||||||
|
let(:user) { create(:user) }
|
||||||
|
let(:logger) { instance_double(Logger, info: nil, debug: nil, error: nil) }
|
||||||
|
let(:service) { described_class.new(user, nil, logger: logger) }
|
||||||
|
|
||||||
|
describe '#add / #finalize' do
|
||||||
|
it 'creates places in batches and tracks total created' do
|
||||||
|
2.times do |index|
|
||||||
|
service.add(
|
||||||
|
'name' => "Place #{index}",
|
||||||
|
'latitude' => 10.0 + index,
|
||||||
|
'longitude' => 20.0 + index
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
expect { service.finalize }.to change(Place, :count).by(2)
|
||||||
|
expect { expect(service.finalize).to eq(2) }.not_to change(Place, :count)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'flushes automatically when the buffer reaches the batch size' do
|
||||||
|
stub_const('Users::ImportData::Places::BATCH_SIZE', 2)
|
||||||
|
|
||||||
|
logger_double = instance_double(Logger)
|
||||||
|
allow(logger_double).to receive(:info)
|
||||||
|
allow(logger_double).to receive(:debug)
|
||||||
|
allow(logger_double).to receive(:error)
|
||||||
|
|
||||||
|
buffered_service = described_class.new(user, nil, batch_size: 2, logger: logger_double)
|
||||||
|
|
||||||
|
buffered_service.add('name' => 'First', 'latitude' => 1, 'longitude' => 2)
|
||||||
|
expect(Place.count).to eq(0)
|
||||||
|
|
||||||
|
buffered_service.add('name' => 'Second', 'latitude' => 3, 'longitude' => 4)
|
||||||
|
expect(Place.count).to eq(2)
|
||||||
|
|
||||||
|
expect(buffered_service.finalize).to eq(2)
|
||||||
|
expect { buffered_service.finalize }.not_to change(Place, :count)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'skips invalid records and logs debug messages' do
|
||||||
|
allow(logger).to receive(:debug)
|
||||||
|
|
||||||
|
service.add('name' => 'Valid', 'latitude' => 1, 'longitude' => 2)
|
||||||
|
service.add('name' => 'Missing coords')
|
||||||
|
|
||||||
|
expect(service.finalize).to eq(1)
|
||||||
|
expect(logger).to have_received(:debug).with(/Skipping place with missing required data/)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Reference in a new issue