Stream points during user data import

This commit is contained in:
Eugene Burmakin 2025-10-05 20:59:03 +02:00
parent 6648d9e593
commit 6591a629ad
8 changed files with 679 additions and 467 deletions

View file

@ -1,6 +1,7 @@
# frozen_string_literal: true
require 'zip'
require 'oj'
# Users::ImportData - Imports complete user data from exported archive
#
@ -22,6 +23,9 @@ require 'zip'
# Files are restored to their original locations and properly attached to records.
class Users::ImportData
STREAM_BATCH_SIZE = 1000
STREAMED_SECTIONS = %w[places visits points].freeze
def initialize(user, archive_path)
@user = user
@archive_path = archive_path
@ -46,10 +50,7 @@ class Users::ImportData
ActiveRecord::Base.transaction do
extract_archive
data = load_json_data
import_in_correct_order(data)
process_archive_data
create_success_notification
@import_stats
@ -73,14 +74,10 @@ class Users::ImportData
zip_file.each do |entry|
next if entry.directory?
# Sanitize entry name to prevent path traversal attacks
sanitized_name = sanitize_zip_entry_name(entry.name)
next if sanitized_name.nil?
# Compute absolute destination path
extraction_path = File.expand_path(File.join(@import_directory, sanitized_name))
# Verify the extraction path is within the import directory
safe_import_dir = File.expand_path(@import_directory) + File::SEPARATOR
unless extraction_path.start_with?(safe_import_dir) || extraction_path == File.expand_path(@import_directory)
Rails.logger.warn "Skipping potentially malicious ZIP entry: #{entry.name} (would extract to #{extraction_path})"
@ -90,24 +87,19 @@ class Users::ImportData
Rails.logger.debug "Extracting #{entry.name} to #{extraction_path}"
FileUtils.mkdir_p(File.dirname(extraction_path))
# Use destination_directory parameter for rubyzip 3.x compatibility
entry.extract(sanitized_name, destination_directory: @import_directory)
end
end
end
def sanitize_zip_entry_name(entry_name)
# Remove leading slashes, backslashes, and dots
sanitized = entry_name.gsub(%r{^[/\\]+}, '')
# Reject entries with path traversal attempts
if sanitized.include?('..') || sanitized.start_with?('/') || sanitized.start_with?('\\')
Rails.logger.warn "Rejecting potentially malicious ZIP entry name: #{entry_name}"
return nil
end
# Reject absolute paths
if Pathname.new(sanitized).absolute?
Rails.logger.warn "Rejecting absolute path in ZIP entry: #{entry_name}"
return nil
@ -116,52 +108,186 @@ class Users::ImportData
sanitized
end
def load_json_data
json_path = @import_directory.join('data.json')
unless File.exist?(json_path)
raise StandardError, "Data file not found in archive: data.json"
end
JSON.parse(File.read(json_path))
rescue JSON::ParserError => e
raise StandardError, "Invalid JSON format in data file: #{e.message}"
end
def import_in_correct_order(data)
def process_archive_data
Rails.logger.info "Starting data import for user: #{user.email}"
if data['counts']
Rails.logger.info "Expected entity counts from export: #{data['counts']}"
json_path = @import_directory.join('data.json')
unless File.exist?(json_path)
raise StandardError, 'Data file not found in archive: data.json'
end
Rails.logger.debug "Available data keys: #{data.keys.inspect}"
initialize_stream_state
handler = ::JsonStreamHandler.new(self)
parser = Oj::Parser.new(:saj, handler: handler)
import_settings(data['settings']) if data['settings']
import_areas(data['areas']) if data['areas']
# Import places first to ensure they're available for visits
places_imported = import_places(data['places']) if data['places']
Rails.logger.info "Places import phase completed: #{places_imported} places imported"
import_imports(data['imports']) if data['imports']
import_exports(data['exports']) if data['exports']
import_trips(data['trips']) if data['trips']
import_stats(data['stats']) if data['stats']
import_notifications(data['notifications']) if data['notifications']
# Import visits after places to ensure proper place resolution
visits_imported = import_visits(data['visits']) if data['visits']
Rails.logger.info "Visits import phase completed: #{visits_imported} visits imported"
import_points(data['points']) if data['points']
# Final validation check
if data['counts']
validate_import_completeness(data['counts'])
File.open(json_path, 'rb') do |io|
parser.load(io)
end
finalize_stream_processing
rescue Oj::ParseError => e
raise StandardError, "Invalid JSON format in data file: #{e.message}"
rescue IOError => e
raise StandardError, "Failed to read JSON data: #{e.message}"
end
def initialize_stream_state
@expected_counts = nil
@places_batch = []
@stream_writers = {}
@stream_temp_paths = {}
end
def finalize_stream_processing
flush_places_batch
close_stream_writer(:visits)
close_stream_writer(:points)
process_visits_stream
process_points_stream
Rails.logger.info "Data import completed. Stats: #{@import_stats}"
validate_import_completeness(@expected_counts) if @expected_counts.present?
end
def handle_section(key, value)
case key
when 'counts'
@expected_counts = value if value.is_a?(Hash)
Rails.logger.info "Expected entity counts from export: #{@expected_counts}" if @expected_counts
when 'settings'
import_settings(value) if value.present?
when 'areas'
import_areas(value)
when 'imports'
import_imports(value)
when 'exports'
import_exports(value)
when 'trips'
import_trips(value)
when 'stats'
import_stats(value)
when 'notifications'
import_notifications(value)
else
Rails.logger.debug "Unhandled non-stream section #{key}" unless STREAMED_SECTIONS.include?(key)
end
end
def handle_stream_value(section, value)
case section
when 'places'
queue_place_for_import(value)
when 'visits'
append_to_stream(:visits, value)
when 'points'
append_to_stream(:points, value)
else
Rails.logger.debug "Received stream value for unknown section #{section}"
end
end
def finish_stream(section)
case section
when 'places'
flush_places_batch
when 'visits'
close_stream_writer(:visits)
when 'points'
close_stream_writer(:points)
end
end
def queue_place_for_import(place_data)
return unless place_data.is_a?(Hash)
@places_batch << place_data
if @places_batch.size >= STREAM_BATCH_SIZE
import_places_batch(@places_batch)
@places_batch.clear
end
end
def flush_places_batch
return if @places_batch.blank?
import_places_batch(@places_batch)
@places_batch.clear
end
def import_places_batch(batch)
Rails.logger.debug "Importing places batch of size #{batch.size}"
places_created = Users::ImportData::Places.new(user, batch.dup).call.to_i
@import_stats[:places_created] += places_created
end
def append_to_stream(section, value)
return unless value
writer = stream_writer(section)
writer.puts(Oj.dump(value, mode: :compat))
end
def stream_writer(section)
@stream_writers[section] ||= begin
path = stream_temp_path(section)
Rails.logger.debug "Creating stream buffer for #{section} at #{path}"
File.open(path, 'w')
end
end
def stream_temp_path(section)
@stream_temp_paths[section] ||= @import_directory.join("stream_#{section}.ndjson")
end
def close_stream_writer(section)
@stream_writers[section]&.close
ensure
@stream_writers.delete(section)
end
def process_visits_stream
path = @stream_temp_paths[:visits]
return unless path&.exist?
Rails.logger.info 'Importing visits from streamed buffer'
batch = []
File.foreach(path) do |line|
line = line.strip
next if line.blank?
batch << Oj.load(line)
if batch.size >= STREAM_BATCH_SIZE
import_visits_batch(batch)
batch = []
end
end
import_visits_batch(batch) if batch.any?
end
def import_visits_batch(batch)
visits_created = Users::ImportData::Visits.new(user, batch).call.to_i
@import_stats[:visits_created] += visits_created
end
def process_points_stream
path = @stream_temp_paths[:points]
return unless path&.exist?
Rails.logger.info 'Importing points from streamed buffer'
importer = Users::ImportData::Points.new(user, nil, batch_size: STREAM_BATCH_SIZE)
File.foreach(path) do |line|
line = line.strip
next if line.blank?
importer.add(Oj.load(line))
end
@import_stats[:points_created] = importer.finalize.to_i
end
def import_settings(settings_data)
@ -172,67 +298,40 @@ class Users::ImportData
def import_areas(areas_data)
Rails.logger.debug "Importing #{areas_data&.size || 0} areas"
areas_created = Users::ImportData::Areas.new(user, areas_data).call
@import_stats[:areas_created] = areas_created
end
def import_places(places_data)
Rails.logger.debug "Importing #{places_data&.size || 0} places"
places_created = Users::ImportData::Places.new(user, places_data).call
@import_stats[:places_created] = places_created
places_created
areas_created = Users::ImportData::Areas.new(user, areas_data).call.to_i
@import_stats[:areas_created] += areas_created
end
def import_imports(imports_data)
Rails.logger.debug "Importing #{imports_data&.size || 0} imports"
imports_created, files_restored = Users::ImportData::Imports.new(user, imports_data, @import_directory.join('files')).call
@import_stats[:imports_created] = imports_created
@import_stats[:files_restored] += files_restored
@import_stats[:imports_created] += imports_created.to_i
@import_stats[:files_restored] += files_restored.to_i
end
def import_exports(exports_data)
Rails.logger.debug "Importing #{exports_data&.size || 0} exports"
exports_created, files_restored = Users::ImportData::Exports.new(user, exports_data, @import_directory.join('files')).call
@import_stats[:exports_created] = exports_created
@import_stats[:files_restored] += files_restored
@import_stats[:exports_created] += exports_created.to_i
@import_stats[:files_restored] += files_restored.to_i
end
def import_trips(trips_data)
Rails.logger.debug "Importing #{trips_data&.size || 0} trips"
trips_created = Users::ImportData::Trips.new(user, trips_data).call
@import_stats[:trips_created] = trips_created
trips_created = Users::ImportData::Trips.new(user, trips_data).call.to_i
@import_stats[:trips_created] += trips_created
end
def import_stats(stats_data)
Rails.logger.debug "Importing #{stats_data&.size || 0} stats"
stats_created = Users::ImportData::Stats.new(user, stats_data).call
@import_stats[:stats_created] = stats_created
stats_created = Users::ImportData::Stats.new(user, stats_data).call.to_i
@import_stats[:stats_created] += stats_created
end
def import_notifications(notifications_data)
Rails.logger.debug "Importing #{notifications_data&.size || 0} notifications"
notifications_created = Users::ImportData::Notifications.new(user, notifications_data).call
@import_stats[:notifications_created] = notifications_created
end
def import_visits(visits_data)
Rails.logger.debug "Importing #{visits_data&.size || 0} visits"
visits_created = Users::ImportData::Visits.new(user, visits_data).call
@import_stats[:visits_created] = visits_created
visits_created
end
def import_points(points_data)
Rails.logger.info "About to import #{points_data&.size || 0} points"
begin
points_created = Users::ImportData::Points.new(user, points_data).call
@import_stats[:points_created] = points_created
rescue StandardError => e
ExceptionReporter.call(e, 'Points import failed')
@import_stats[:points_created] = 0
end
notifications_created = Users::ImportData::Notifications.new(user, notifications_data).call.to_i
@import_stats[:notifications_created] += notifications_created
end
def cleanup_temporary_files(import_directory)
@ -246,15 +345,15 @@ class Users::ImportData
def create_success_notification
summary = "#{@import_stats[:points_created]} points, " \
"#{@import_stats[:visits_created]} visits, " \
"#{@import_stats[:places_created]} places, " \
"#{@import_stats[:trips_created]} trips, " \
"#{@import_stats[:areas_created]} areas, " \
"#{@import_stats[:imports_created]} imports, " \
"#{@import_stats[:exports_created]} exports, " \
"#{@import_stats[:stats_created]} stats, " \
"#{@import_stats[:files_restored]} files restored, " \
"#{@import_stats[:notifications_created]} notifications"
"#{@import_stats[:visits_created]} visits, " \
"#{@import_stats[:places_created]} places, " \
"#{@import_stats[:trips_created]} trips, " \
"#{@import_stats[:areas_created]} areas, " \
"#{@import_stats[:imports_created]} imports, " \
"#{@import_stats[:exports_created]} exports, " \
"#{@import_stats[:stats_created]} stats, " \
"#{@import_stats[:files_restored]} files restored, " \
"#{@import_stats[:notifications_created]} notifications"
::Notifications::Create.new(
user: user,
@ -274,7 +373,7 @@ class Users::ImportData
end
def validate_import_completeness(expected_counts)
Rails.logger.info "Validating import completeness..."
Rails.logger.info 'Validating import completeness...'
discrepancies = []
@ -291,7 +390,7 @@ class Users::ImportData
if discrepancies.any?
Rails.logger.warn "Import completed with discrepancies: #{discrepancies.join(', ')}"
else
Rails.logger.info "Import validation successful - all entities imported correctly"
Rails.logger.info 'Import validation successful - all entities imported correctly'
end
end
end

View file

@ -1,32 +1,71 @@
# frozen_string_literal: true
class Users::ImportData::Places
def initialize(user, places_data)
BATCH_SIZE = 1000
def initialize(user, places_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger)
@user = user
@places_data = places_data
@batch_size = batch_size
@logger = logger
@buffer = []
@created = 0
end
def call
return 0 unless places_data.is_a?(Array)
return 0 unless places_data.respond_to?(:each)
Rails.logger.info "Importing #{places_data.size} places for user: #{user.email}"
logger.info "Importing #{collection_description(places_data)} places for user: #{user.email}"
places_created = 0
places_data.each do |place_data|
next unless place_data.is_a?(Hash)
place = find_or_create_place_for_import(place_data)
places_created += 1 if place&.respond_to?(:previously_new_record?) && place.previously_new_record?
enumerate(places_data) do |place_data|
add(place_data)
end
Rails.logger.info "Places import completed. Created: #{places_created}"
places_created
finalize
end
def add(place_data)
return unless place_data.is_a?(Hash)
@buffer << place_data
flush_batch if @buffer.size >= batch_size
end
def finalize
flush_batch
logger.info "Places import completed. Created: #{@created}"
@created
end
private
attr_reader :user, :places_data
attr_reader :user, :places_data, :batch_size, :logger
def enumerate(collection, &block)
if collection.is_a?(Array)
collection.each(&block)
else
collection.each(&block)
end
end
def collection_description(collection)
return collection.size if collection.respond_to?(:size)
'streamed'
end
def flush_batch
return if @buffer.empty?
logger.debug "Processing places batch of #{@buffer.size}"
@buffer.each do |place_data|
place = find_or_create_place_for_import(place_data)
@created += 1 if place&.respond_to?(:previously_new_record?) && place.previously_new_record?
end
@buffer.clear
end
def find_or_create_place_for_import(place_data)
name = place_data['name']
@ -34,14 +73,12 @@ class Users::ImportData::Places
longitude = place_data['longitude']&.to_f
unless name.present? && latitude.present? && longitude.present?
Rails.logger.debug "Skipping place with missing required data: #{place_data.inspect}"
logger.debug "Skipping place with missing required data: #{place_data.inspect}"
return nil
end
Rails.logger.debug "Processing place for import: #{name} at (#{latitude}, #{longitude})"
logger.debug "Processing place for import: #{name} at (#{latitude}, #{longitude})"
# During import, we prioritize data integrity for the importing user
# First try exact match (name + coordinates)
existing_place = Place.where(
name: name,
latitude: latitude,
@ -49,31 +86,29 @@ class Users::ImportData::Places
).first
if existing_place
Rails.logger.debug "Found exact place match: #{name} at (#{latitude}, #{longitude}) -> existing place ID #{existing_place.id}"
logger.debug "Found exact place match: #{name} at (#{latitude}, #{longitude}) -> existing place ID #{existing_place.id}"
existing_place.define_singleton_method(:previously_new_record?) { false }
return existing_place
end
Rails.logger.debug "No exact match found for #{name} at (#{latitude}, #{longitude}). Creating new place."
logger.debug "No exact match found for #{name} at (#{latitude}, #{longitude}). Creating new place."
# If no exact match, create a new place to ensure data integrity
# This prevents data loss during import even if similar places exist
place_attributes = place_data.except('created_at', 'updated_at', 'latitude', 'longitude')
place_attributes['lonlat'] = "POINT(#{longitude} #{latitude})"
place_attributes['latitude'] = latitude
place_attributes['longitude'] = longitude
place_attributes.delete('user')
Rails.logger.debug "Creating place with attributes: #{place_attributes.inspect}"
logger.debug "Creating place with attributes: #{place_attributes.inspect}"
begin
place = Place.create!(place_attributes)
place.define_singleton_method(:previously_new_record?) { true }
Rails.logger.debug "Created place during import: #{place.name} (ID: #{place.id})"
logger.debug "Created place during import: #{place.name} (ID: #{place.id})"
place
rescue ActiveRecord::RecordInvalid => e
Rails.logger.error "Failed to create place: #{place_data.inspect}, error: #{e.message}"
logger.error "Failed to create place: #{place_data.inspect}, error: #{e.message}"
nil
end
end

View file

@ -1,96 +1,167 @@
# frozen_string_literal: true
require 'time'
class Users::ImportData::Points
BATCH_SIZE = 1000
def initialize(user, points_data)
def initialize(user, points_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger)
@user = user
@points_data = points_data
@batch_size = batch_size
@logger = logger
@buffer = []
@total_created = 0
@processed_count = 0
@skipped_count = 0
@preloaded = false
@imports_lookup = {}
@countries_lookup = {}
@visits_lookup = {}
end
def call
return 0 unless points_data.is_a?(Array)
return 0 unless points_data.respond_to?(:each)
Rails.logger.info "Importing #{points_data.size} points for user: #{user.email}"
Rails.logger.debug "First point sample: #{points_data.first.inspect}"
logger.info "Importing #{collection_description(points_data)} points for user: #{user.email}"
preload_reference_data
valid_points = filter_and_prepare_points
if valid_points.empty?
Rails.logger.warn "No valid points to import after filtering"
Rails.logger.debug "Original points_data size: #{points_data.size}"
return 0
enumerate(points_data) do |point_data|
add(point_data)
end
deduplicated_points = deduplicate_points(valid_points)
finalize
end
Rails.logger.info "Prepared #{deduplicated_points.size} unique valid points (#{points_data.size - deduplicated_points.size} duplicates/invalid skipped)"
# Allows streamed usage by pushing a single point at a time.
def add(point_data)
preload_reference_data unless @preloaded
total_created = bulk_import_points(deduplicated_points)
if valid_point_data?(point_data)
prepared_attributes = prepare_point_attributes(point_data)
Rails.logger.info "Points import completed. Created: #{total_created}"
total_created
if prepared_attributes
@buffer << prepared_attributes
@processed_count += 1
flush_batch if @buffer.size >= batch_size
else
@skipped_count += 1
end
else
@skipped_count += 1
logger.debug "Skipped point: invalid data - #{point_data.inspect}"
end
end
def finalize
preload_reference_data unless @preloaded
flush_batch
logger.info "Points import completed. Created: #{@total_created}. Processed #{@processed_count} valid points, skipped #{@skipped_count}."
@total_created
end
private
attr_reader :user, :points_data, :imports_lookup, :countries_lookup, :visits_lookup
attr_reader :user, :points_data, :batch_size, :logger, :imports_lookup, :countries_lookup, :visits_lookup
def enumerate(collection, &block)
if collection.is_a?(Array)
collection.each(&block)
else
collection.each(&block)
end
end
def collection_description(collection)
return collection.size if collection.respond_to?(:size)
'streamed'
end
def flush_batch
return if @buffer.empty?
logger.debug "Processing batch of #{@buffer.size} points"
logger.debug "First point in batch: #{@buffer.first.inspect}"
normalized_batch = normalize_point_keys(@buffer)
begin
result = Point.upsert_all(
normalized_batch,
unique_by: %i[lonlat timestamp user_id],
returning: %w[id],
on_duplicate: :skip
)
batch_created = result&.count.to_i
@total_created += batch_created
logger.debug "Processed batch of #{@buffer.size} points, created #{batch_created}, total created: #{@total_created}"
rescue StandardError => e
logger.error "Failed to process point batch: #{e.message}"
logger.error "Batch size: #{@buffer.size}"
logger.error "First point in failed batch: #{@buffer.first.inspect}"
logger.error "Backtrace: #{e.backtrace.first(5).join('\n')}"
ensure
@buffer.clear
end
end
def preload_reference_data
return if @preloaded
logger.debug 'Preloading reference data for points import'
@imports_lookup = {}
user.imports.each do |import|
user.imports.reload.each do |import|
string_key = [import.name, import.source, import.created_at.utc.iso8601]
integer_key = [import.name, Import.sources[import.source], import.created_at.utc.iso8601]
@imports_lookup[string_key] = import
@imports_lookup[integer_key] = import
end
Rails.logger.debug "Loaded #{user.imports.size} imports with #{@imports_lookup.size} lookup keys"
logger.debug "Loaded #{user.imports.size} imports with #{@imports_lookup.size} lookup keys"
@countries_lookup = {}
Country.all.each do |country|
@countries_lookup[[country.name, country.iso_a2, country.iso_a3]] = country
@countries_lookup[country.name] = country
end
Rails.logger.debug "Loaded #{Country.count} countries for lookup"
logger.debug "Loaded #{Country.count} countries for lookup"
@visits_lookup = user.visits.index_by { |visit|
@visits_lookup = user.visits.reload.index_by do |visit|
[visit.name, visit.started_at.utc.iso8601, visit.ended_at.utc.iso8601]
}
Rails.logger.debug "Loaded #{@visits_lookup.size} visits for lookup"
end
logger.debug "Loaded #{@visits_lookup.size} visits for lookup"
@preloaded = true
end
def filter_and_prepare_points
valid_points = []
skipped_count = 0
def normalize_point_keys(points)
all_keys = points.flat_map(&:keys).uniq
points_data.each_with_index do |point_data, index|
next unless point_data.is_a?(Hash)
unless valid_point_data?(point_data)
skipped_count += 1
Rails.logger.debug "Skipped point #{index}: invalid data - #{point_data.slice('timestamp', 'longitude', 'latitude', 'lonlat')}"
next
points.map do |point|
all_keys.each_with_object({}) do |key, normalized|
normalized[key] = point[key]
end
prepared_attributes = prepare_point_attributes(point_data)
unless prepared_attributes
skipped_count += 1
Rails.logger.debug "Skipped point #{index}: failed to prepare attributes"
next
end
valid_points << prepared_attributes
end
end
if skipped_count > 0
Rails.logger.warn "Skipped #{skipped_count} points with invalid or missing required data"
end
def valid_point_data?(point_data)
return false unless point_data.is_a?(Hash)
return false unless point_data['timestamp'].present?
Rails.logger.debug "Filtered #{valid_points.size} valid points from #{points_data.size} total"
valid_points
has_lonlat = point_data['lonlat'].present? && point_data['lonlat'].is_a?(String) && point_data['lonlat'].start_with?('POINT(')
has_coordinates = point_data['longitude'].present? && point_data['latitude'].present?
has_lonlat || has_coordinates
rescue StandardError => e
logger.debug "Point validation failed: #{e.message} for data: #{point_data.inspect}"
false
end
def prepare_point_attributes(point_data)
@ -118,15 +189,14 @@ class Users::ImportData::Points
result = attributes.symbolize_keys
Rails.logger.debug "Prepared point attributes: #{result.slice(:lonlat, :timestamp, :import_id, :country_id, :visit_id)}"
logger.debug "Prepared point attributes: #{result.slice(:lonlat, :timestamp, :import_id, :country_id, :visit_id)}"
result
rescue StandardError => e
ExceptionReporter.call(e, 'Failed to prepare point attributes')
nil
end
def resolve_import_reference(attributes, import_reference)
def resolve_import_reference(attributes, import_reference)
return unless import_reference.is_a?(Hash)
created_at = normalize_timestamp_for_lookup(import_reference['created_at'])
@ -140,10 +210,10 @@ class Users::ImportData::Points
import = imports_lookup[import_key]
if import
attributes['import_id'] = import.id
Rails.logger.debug "Resolved import reference: #{import_reference['name']} -> #{import.id}"
logger.debug "Resolved import reference: #{import_reference['name']} -> #{import.id}"
else
Rails.logger.debug "Import not found for reference: #{import_reference.inspect}"
Rails.logger.debug "Available imports: #{imports_lookup.keys.inspect}"
logger.debug "Import not found for reference: #{import_reference.inspect}"
logger.debug "Available imports: #{imports_lookup.keys.inspect}"
end
end
@ -159,14 +229,12 @@ class Users::ImportData::Points
if country
attributes['country_id'] = country.id
Rails.logger.debug "Resolved country reference: #{country_info['name']} -> #{country.id}"
logger.debug "Resolved country reference: #{country_info['name']} -> #{country.id}"
else
Rails.logger.debug "Country not found for: #{country_info.inspect}"
logger.debug "Country not found for: #{country_info.inspect}"
end
end
def resolve_visit_reference(attributes, visit_reference)
return unless visit_reference.is_a?(Hash)
@ -182,84 +250,19 @@ class Users::ImportData::Points
visit = visits_lookup[visit_key]
if visit
attributes['visit_id'] = visit.id
Rails.logger.debug "Resolved visit reference: #{visit_reference['name']} -> #{visit.id}"
logger.debug "Resolved visit reference: #{visit_reference['name']} -> #{visit.id}"
else
Rails.logger.debug "Visit not found for reference: #{visit_reference.inspect}"
Rails.logger.debug "Available visits: #{visits_lookup.keys.inspect}"
logger.debug "Visit not found for reference: #{visit_reference.inspect}"
logger.debug "Available visits: #{visits_lookup.keys.inspect}"
end
end
def deduplicate_points(points)
points.uniq { |point| [point[:lonlat], point[:timestamp], point[:user_id]] }
end
def normalize_point_keys(points)
all_keys = points.flat_map(&:keys).uniq
# Normalize each point to have all keys (with nil for missing ones)
points.map do |point|
normalized = {}
all_keys.each do |key|
normalized[key] = point[key]
end
normalized
end
end
def bulk_import_points(points)
total_created = 0
points.each_slice(BATCH_SIZE) do |batch|
begin
Rails.logger.debug "Processing batch of #{batch.size} points"
Rails.logger.debug "First point in batch: #{batch.first.inspect}"
normalized_batch = normalize_point_keys(batch)
result = Point.upsert_all(
normalized_batch,
unique_by: %i[lonlat timestamp user_id],
returning: %w[id],
on_duplicate: :skip
)
batch_created = result.count
total_created += batch_created
Rails.logger.debug "Processed batch of #{batch.size} points, created #{batch_created}, total created: #{total_created}"
rescue StandardError => e
Rails.logger.error "Failed to process point batch: #{e.message}"
Rails.logger.error "Batch size: #{batch.size}"
Rails.logger.error "First point in failed batch: #{batch.first.inspect}"
Rails.logger.error "Backtrace: #{e.backtrace.first(5).join('\n')}"
end
end
total_created
end
def valid_point_data?(point_data)
return false unless point_data.is_a?(Hash)
return false unless point_data['timestamp'].present?
has_lonlat = point_data['lonlat'].present? && point_data['lonlat'].is_a?(String) && point_data['lonlat'].start_with?('POINT(')
has_coordinates = point_data['longitude'].present? && point_data['latitude'].present?
return false unless has_lonlat || has_coordinates
true
rescue StandardError => e
Rails.logger.debug "Point validation failed: #{e.message} for data: #{point_data.inspect}"
false
end
def ensure_lonlat_field(attributes, point_data)
if attributes['lonlat'].blank? && point_data['longitude'].present? && point_data['latitude'].present?
longitude = point_data['longitude'].to_f
latitude = point_data['latitude'].to_f
attributes['lonlat'] = "POINT(#{longitude} #{latitude})"
Rails.logger.debug "Reconstructed lonlat: #{attributes['lonlat']}"
logger.debug "Reconstructed lonlat: #{attributes['lonlat']}"
end
end
@ -275,7 +278,7 @@ class Users::ImportData::Points
timestamp.to_s
end
rescue StandardError => e
Rails.logger.debug "Failed to normalize timestamp #{timestamp}: #{e.message}"
logger.debug "Failed to normalize timestamp #{timestamp}: #{e.message}"
timestamp.to_s
end
end

View file

@ -0,0 +1,36 @@
# Import Optimisation Plan
## Goals
- Prevent large imports from exhausting memory or hitting IO limits while reading export archives.
- Maintain correctness and ordering guarantees for all imported entities.
- Preserve observability and operability (clear errors and actionable logs).
## Current Status
- ✅ Replaced `File.read + JSON.parse` with streaming via `Oj::Parser(:saj).load`, so `data.json` is consumed in 16KB chunks instead of loading the whole file.
- ✅ `Users::ImportData` now dispatches streamed payloads section-by-section, buffering `places` in-memory batches and spilling `visits`/`points` to NDJSON for replay after dependencies are ready.
- ✅ Points, places, and visits importers support incremental ingestion with a fixed batch size of 1,000 records and detailed progress logs.
- ✅ Added targeted specs for the SAJ handler and streaming flow; addressed IO retry messaging.
- ⚙️ Pending: archive-size guardrails, broader telemetry, and production rollout validation.
## Remaining Pain Points
- No preflight check yet for extreme `data.json` sizes or malformed streams.
- Logging only (no metrics/dashboards) for monitoring batch throughput and failures.
## Next Steps
1. **Rollout & Hardening**
- Add size/structure validation before streaming (fail fast with actionable error).
- Extend log coverage (import durations, batch counts) and document operator playbook.
- Capture memory/runtime snapshots during large staged imports.
2. **Post-Rollout Validation**
- Re-run the previously failing Sidekiq job (import 105) under the new pipeline.
- Monitor Sidekiq memory and throughput; tune batch size if needed.
- Gather feedback and decide on export format split or further streaming tweaks.
## Validation Strategy
- Automated: streaming parser specs, importer batch tests, service integration spec (already in place; expand as new safeguards land).
- Manual: stage large imports, inspect Sidekiq logs/metrics once added, confirm notifications, stats, and files restored.
## Open Questions
- What thresholds should trigger preflight failures or warnings (file size, record counts)?
- Do we need structured metrics beyond logs for long-running imports?
- Should we pursue export format splitting or incremental resume once streaming rollout is stable?

View file

@ -0,0 +1,79 @@
# frozen_string_literal: true
# Streaming JSON handler relays sections and streamed values back to the importer instance.
class JsonStreamHandler < Oj::Saj
HashState = Struct.new(:hash, :root, :key)
ArrayState = Struct.new(:array, :key)
StreamState = Struct.new(:key)
def initialize(processor)
super()
@processor = processor
@stack = []
end
def hash_start(key = nil, *_)
state = HashState.new({}, @stack.empty?, normalize_key(key))
@stack << state
end
def hash_end(key = nil, *_)
state = @stack.pop
value = state.hash
parent = @stack.last
dispatch_to_parent(parent, value, normalize_key(key) || state.key)
end
def array_start(key = nil, *_)
normalized_key = normalize_key(key)
parent = @stack.last
if parent.is_a?(HashState) && parent.root && @stack.size == 1 && Users::ImportData::STREAMED_SECTIONS.include?(normalized_key)
@stack << StreamState.new(normalized_key)
else
@stack << ArrayState.new([], normalized_key)
end
end
def array_end(key = nil, *_)
state = @stack.pop
case state
when StreamState
@processor.send(:finish_stream, state.key)
when ArrayState
value = state.array
parent = @stack.last
dispatch_to_parent(parent, value, normalize_key(key) || state.key)
end
end
def add_value(value, key)
parent = @stack.last
dispatch_to_parent(parent, value, normalize_key(key))
end
private
def normalize_key(key)
key&.to_s
end
def dispatch_to_parent(parent, value, key)
return unless parent
case parent
when HashState
if parent.root && @stack.size == 1
@processor.send(:handle_section, key, value)
else
parent.hash[key] = value
end
when ArrayState
parent.array << value
when StreamState
@processor.send(:handle_stream_value, parent.key, value)
end
end
end

View file

@ -0,0 +1,56 @@
# frozen_string_literal: true
require 'rails_helper'
require 'oj'
RSpec.describe JsonStreamHandler do
let(:processor) { double('StreamProcessor') }
let(:handler) { described_class.new(processor) }
before do
allow(processor).to receive(:handle_section)
allow(processor).to receive(:handle_stream_value)
allow(processor).to receive(:finish_stream)
end
it 'streams configured sections and delegates other values immediately' do
payload = {
'counts' => { 'places' => 2, 'visits' => 1, 'points' => 1 },
'settings' => { 'theme' => 'dark' },
'areas' => [{ 'name' => 'Home' }],
'places' => [
{ 'name' => 'Cafe', 'latitude' => 1.0, 'longitude' => 2.0 },
{ 'name' => 'Library', 'latitude' => 3.0, 'longitude' => 4.0 }
],
'visits' => [
{
'name' => 'Morning Coffee',
'started_at' => '2025-01-01T09:00:00Z',
'ended_at' => '2025-01-01T10:00:00Z'
}
],
'points' => [
{ 'timestamp' => 1, 'lonlat' => 'POINT(2 1)' }
]
}
Oj.saj_parse(handler, Oj.dump(payload, mode: :compat))
expect(processor).to have_received(:handle_section).with('counts', hash_including('places' => 2))
expect(processor).to have_received(:handle_section).with('settings', hash_including('theme' => 'dark'))
expect(processor).to have_received(:handle_section).with('areas', [hash_including('name' => 'Home')])
expect(processor).to have_received(:handle_stream_value).with('places', hash_including('name' => 'Cafe'))
expect(processor).to have_received(:handle_stream_value).with('places', hash_including('name' => 'Library'))
expect(processor).to have_received(:handle_stream_value).with('visits', hash_including('name' => 'Morning Coffee'))
expect(processor).to have_received(:handle_stream_value).with('points', hash_including('timestamp' => 1))
expect(processor).to have_received(:finish_stream).with('places')
expect(processor).to have_received(:finish_stream).with('visits')
expect(processor).to have_received(:finish_stream).with('points')
expect(processor).not_to have_received(:handle_section).with('places', anything)
expect(processor).not_to have_received(:handle_section).with('visits', anything)
expect(processor).not_to have_received(:handle_section).with('points', anything)
end
end

View file

@ -1,6 +1,8 @@
# frozen_string_literal: true
require 'rails_helper'
require 'tmpdir'
require 'oj'
RSpec.describe Users::ImportData, type: :service do
let(:user) { create(:user) }
@ -12,122 +14,59 @@ RSpec.describe Users::ImportData, type: :service do
allow(Time).to receive(:current).and_return(Time.zone.at(1234567890))
allow(FileUtils).to receive(:mkdir_p)
allow(FileUtils).to receive(:rm_rf)
allow(File).to receive(:directory?).and_return(true)
allow_any_instance_of(Pathname).to receive(:exist?).and_return(true)
end
describe '#import' do
let(:sample_data) do
{
'counts' => {
'areas' => 2,
'places' => 3,
'imports' => 1,
'exports' => 1,
'trips' => 2,
'stats' => 1,
'notifications' => 2,
'visits' => 4,
'points' => 1000
},
'settings' => { 'theme' => 'dark' },
'areas' => [{ 'name' => 'Home', 'latitude' => '40.7128', 'longitude' => '-74.0060' }],
'places' => [{ 'name' => 'Office', 'latitude' => '40.7589', 'longitude' => '-73.9851' }],
'imports' => [{ 'name' => 'test.json', 'source' => 'owntracks' }],
'exports' => [{ 'name' => 'export.json', 'status' => 'completed' }],
'trips' => [{ 'name' => 'Trip to NYC', 'distance' => 100.5 }],
'stats' => [{ 'year' => 2024, 'month' => 1, 'distance' => 456.78 }],
'notifications' => [{ 'title' => 'Test', 'content' => 'Test notification' }],
'visits' => [{ 'name' => 'Work Visit', 'duration' => 3600 }],
'points' => [{ 'latitude' => 40.7128, 'longitude' => -74.0060, 'timestamp' => 1234567890 }]
}
end
let(:notification_double) { instance_double(::Notifications::Create, call: true) }
before do
# Mock ZIP file extraction
zipfile_mock = double('ZipFile')
allow(zipfile_mock).to receive(:each)
allow(Zip::File).to receive(:open).with(archive_path).and_yield(zipfile_mock)
# Mock JSON loading and File operations
allow(File).to receive(:exist?).and_return(false)
allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true)
allow(File).to receive(:read).with(import_directory.join('data.json')).and_return(sample_data.to_json)
# Mock all import services
allow(Users::ImportData::Settings).to receive(:new).and_return(double(call: true))
allow(Users::ImportData::Areas).to receive(:new).and_return(double(call: 2))
allow(Users::ImportData::Places).to receive(:new).and_return(double(call: 3))
allow(Users::ImportData::Imports).to receive(:new).and_return(double(call: [1, 5]))
allow(Users::ImportData::Exports).to receive(:new).and_return(double(call: [1, 2]))
allow(Users::ImportData::Trips).to receive(:new).and_return(double(call: 2))
allow(Users::ImportData::Stats).to receive(:new).and_return(double(call: 1))
allow(Users::ImportData::Notifications).to receive(:new).and_return(double(call: 2))
allow(Users::ImportData::Visits).to receive(:new).and_return(double(call: 4))
allow(Users::ImportData::Points).to receive(:new).and_return(double(call: 1000))
# Mock notifications
allow(::Notifications::Create).to receive(:new).and_return(double(call: true))
# Mock cleanup
allow(::Notifications::Create).to receive(:new).and_return(notification_double)
allow(service).to receive(:cleanup_temporary_files)
allow_any_instance_of(Pathname).to receive(:exist?).and_return(true)
end
context 'when import is successful' do
it 'creates import directory' do
context 'when import succeeds' do
before do
allow(service).to receive(:extract_archive)
allow(service).to receive(:process_archive_data) do
stats = service.instance_variable_get(:@import_stats)
stats[:settings_updated] = true
stats[:areas_created] = 2
stats[:places_created] = 3
stats[:imports_created] = 1
stats[:exports_created] = 1
stats[:trips_created] = 2
stats[:stats_created] = 1
stats[:notifications_created] = 2
stats[:visits_created] = 4
stats[:points_created] = 1000
stats[:files_restored] = 7
end
end
it 'creates the import directory' do
expect(FileUtils).to receive(:mkdir_p).with(import_directory)
service.import
end
it 'extracts the archive' do
expect(Zip::File).to receive(:open).with(archive_path)
it 'extracts the archive and processes data' do
expect(service).to receive(:extract_archive).ordered
expect(service).to receive(:process_archive_data).ordered
service.import
end
it 'loads JSON data from extracted files' do
expect(File).to receive(:exist?).with(import_directory.join('data.json'))
expect(File).to receive(:read).with(import_directory.join('data.json'))
service.import
end
it 'calls all import services in correct order' do
expect(Users::ImportData::Settings).to receive(:new).with(user, sample_data['settings']).ordered
expect(Users::ImportData::Areas).to receive(:new).with(user, sample_data['areas']).ordered
expect(Users::ImportData::Places).to receive(:new).with(user, sample_data['places']).ordered
expect(Users::ImportData::Imports).to receive(:new).with(user, sample_data['imports'], import_directory.join('files')).ordered
expect(Users::ImportData::Exports).to receive(:new).with(user, sample_data['exports'], import_directory.join('files')).ordered
expect(Users::ImportData::Trips).to receive(:new).with(user, sample_data['trips']).ordered
expect(Users::ImportData::Stats).to receive(:new).with(user, sample_data['stats']).ordered
expect(Users::ImportData::Notifications).to receive(:new).with(user, sample_data['notifications']).ordered
expect(Users::ImportData::Visits).to receive(:new).with(user, sample_data['visits']).ordered
expect(Users::ImportData::Points).to receive(:new).with(user, sample_data['points']).ordered
service.import
end
it 'creates success notification with import stats' do
it 'creates a success notification with summary' do
expect(::Notifications::Create).to receive(:new).with(
user: user,
title: 'Data import completed',
content: match(/1000 points.*4 visits.*3 places.*2 trips/),
content: include('1000 points, 4 visits, 3 places, 2 trips'),
kind: :info
)
service.import
end
it 'cleans up temporary files' do
expect(service).to receive(:cleanup_temporary_files).with(import_directory)
service.import
end
it 'returns import statistics' do
result = service.import
expect(result).to include(
settings_updated: true,
areas_created: 2,
@ -142,53 +81,18 @@ RSpec.describe Users::ImportData, type: :service do
files_restored: 7
)
end
it 'logs expected counts if available' do
allow(Rails.logger).to receive(:info) # Allow other log messages
expect(Rails.logger).to receive(:info).with(/Expected entity counts from export:/)
service.import
end
end
context 'when JSON file is missing' do
context 'when an error happens during processing' do
let(:error_message) { 'boom' }
before do
allow(File).to receive(:exist?).and_return(false)
allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(false)
allow(service).to receive(:extract_archive)
allow(service).to receive(:process_archive_data).and_raise(StandardError, error_message)
allow(ExceptionReporter).to receive(:call)
end
it 'raises an error' do
expect { service.import }.to raise_error(StandardError, 'Data file not found in archive: data.json')
end
end
context 'when JSON is invalid' do
before do
allow(File).to receive(:exist?).and_return(false)
allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true)
allow(File).to receive(:read).with(import_directory.join('data.json')).and_return('invalid json')
allow(ExceptionReporter).to receive(:call)
end
it 'raises a JSON parse error' do
expect { service.import }.to raise_error(StandardError, /Invalid JSON format in data file/)
end
end
context 'when an error occurs during import' do
let(:error_message) { 'Something went wrong' }
before do
allow(File).to receive(:exist?).and_return(false)
allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true)
allow(File).to receive(:read).with(import_directory.join('data.json')).and_return(sample_data.to_json)
allow(Users::ImportData::Settings).to receive(:new).and_raise(StandardError, error_message)
allow(ExceptionReporter).to receive(:call)
allow(::Notifications::Create).to receive(:new).and_return(double(call: true))
end
it 'creates failure notification' do
it 'creates a failure notification and re-raises the error' do
expect(::Notifications::Create).to receive(:new).with(
user: user,
title: 'Data import failed',
@ -198,100 +102,99 @@ RSpec.describe Users::ImportData, type: :service do
expect { service.import }.to raise_error(StandardError, error_message)
end
it 'reports error via ExceptionReporter' do
expect(ExceptionReporter).to receive(:call).with(
an_instance_of(StandardError),
'Data import failed'
)
expect { service.import }.to raise_error(StandardError, error_message)
end
it 'still cleans up temporary files' do
expect(service).to receive(:cleanup_temporary_files)
expect { service.import }.to raise_error(StandardError, error_message)
end
it 're-raises the error' do
expect { service.import }.to raise_error(StandardError, error_message)
end
end
context 'when data sections are missing' do
let(:minimal_data) { { 'settings' => { 'theme' => 'dark' } } }
before do
# Reset JSON file mocking
allow(File).to receive(:exist?).and_return(false)
allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true)
allow(File).to receive(:read).with(import_directory.join('data.json')).and_return(minimal_data.to_json)
# Only expect Settings to be called
allow(Users::ImportData::Settings).to receive(:new).and_return(double(call: true))
allow(::Notifications::Create).to receive(:new).and_return(double(call: true))
end
it 'only imports available sections' do
expect(Users::ImportData::Settings).to receive(:new).with(user, minimal_data['settings'])
expect(Users::ImportData::Areas).not_to receive(:new)
expect(Users::ImportData::Places).not_to receive(:new)
service.import
end
end
end
describe 'private methods' do
describe '#cleanup_temporary_files' do
context 'when directory exists' do
before do
allow(File).to receive(:directory?).and_return(true)
allow(Rails.logger).to receive(:info)
end
describe '#process_archive_data' do
let(:tmp_dir) { Pathname.new(Dir.mktmpdir) }
let(:json_path) { tmp_dir.join('data.json') }
let(:places_calls) { [] }
let(:visits_batches) { [] }
let(:points_ingested) { [] }
let(:points_importer) do
instance_double(Users::ImportData::Points, add: nil, finalize: 2)
end
it 'removes the directory' do
expect(FileUtils).to receive(:rm_rf).with(import_directory)
before do
payload = {
'counts' => { 'places' => 2, 'visits' => 2, 'points' => 2 },
'settings' => { 'theme' => 'dark' },
'areas' => [],
'imports' => [],
'exports' => [],
'trips' => [],
'stats' => [],
'notifications' => [],
'places' => [
{ 'name' => 'Cafe', 'latitude' => 1.0, 'longitude' => 2.0 },
{ 'name' => 'Library', 'latitude' => 3.0, 'longitude' => 4.0 }
],
'visits' => [
{
'name' => 'Morning Coffee',
'started_at' => '2025-01-01T09:00:00Z',
'ended_at' => '2025-01-01T10:00:00Z'
},
{
'name' => 'Study Time',
'started_at' => '2025-01-01T12:00:00Z',
'ended_at' => '2025-01-01T14:00:00Z'
}
],
'points' => [
{ 'timestamp' => 1, 'lonlat' => 'POINT(2 1)' },
{ 'timestamp' => 2, 'lonlat' => 'POINT(4 3)' }
]
}
service.send(:cleanup_temporary_files, import_directory)
end
File.write(json_path, Oj.dump(payload, mode: :compat))
it 'logs the cleanup' do
expect(Rails.logger).to receive(:info).with("Cleaning up temporary import directory: #{import_directory}")
service.instance_variable_set(:@import_directory, tmp_dir)
service.send(:cleanup_temporary_files, import_directory)
end
allow(Users::ImportData::Settings).to receive(:new).and_return(double(call: true))
allow(Users::ImportData::Areas).to receive(:new).and_return(double(call: 0))
allow(Users::ImportData::Imports).to receive(:new).and_return(double(call: [0, 0]))
allow(Users::ImportData::Exports).to receive(:new).and_return(double(call: [0, 0]))
allow(Users::ImportData::Trips).to receive(:new).and_return(double(call: 0))
allow(Users::ImportData::Stats).to receive(:new).and_return(double(call: 0))
allow(Users::ImportData::Notifications).to receive(:new).and_return(double(call: 0))
allow(Users::ImportData::Places).to receive(:new) do |_, batch|
places_calls << batch
double(call: batch.size)
end
context 'when cleanup fails' do
before do
allow(File).to receive(:directory?).and_return(true)
allow(FileUtils).to receive(:rm_rf).and_raise(StandardError, 'Permission denied')
allow(ExceptionReporter).to receive(:call)
end
it 'reports error via ExceptionReporter but does not re-raise' do
expect(ExceptionReporter).to receive(:call).with(
an_instance_of(StandardError),
'Failed to cleanup temporary files'
)
expect { service.send(:cleanup_temporary_files, import_directory) }.not_to raise_error
end
allow(Users::ImportData::Visits).to receive(:new) do |_, batch|
visits_batches << batch
double(call: batch.size)
end
context 'when directory does not exist' do
before do
allow(File).to receive(:directory?).and_return(false)
end
it 'does not attempt cleanup' do
expect(FileUtils).not_to receive(:rm_rf)
service.send(:cleanup_temporary_files, import_directory)
end
allow(points_importer).to receive(:add) do |point|
points_ingested << point
end
allow(Users::ImportData::Points).to receive(:new) do |_, points_data, batch_size:|
expect(points_data).to be_nil
expect(batch_size).to eq(described_class::STREAM_BATCH_SIZE)
points_importer
end
end
after do
FileUtils.remove_entry(tmp_dir)
end
it 'streams sections and updates import stats' do
service.send(:process_archive_data)
expect(places_calls.flatten.map { |place| place['name'] }).to contain_exactly('Cafe', 'Library')
expect(visits_batches.flatten.map { |visit| visit['name'] }).to contain_exactly('Morning Coffee', 'Study Time')
expect(points_ingested.map { |point| point['timestamp'] }).to eq([1, 2])
stats = service.instance_variable_get(:@import_stats)
expect(stats[:places_created]).to eq(2)
expect(stats[:visits_created]).to eq(2)
expect(stats[:points_created]).to eq(2)
end
end
end

1
tmp.json Normal file
View file

@ -0,0 +1 @@
{"a":1}