diff --git a/app/services/users/import_data.rb b/app/services/users/import_data.rb index b6a01161..430f34e3 100644 --- a/app/services/users/import_data.rb +++ b/app/services/users/import_data.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'zip' +require 'oj' # Users::ImportData - Imports complete user data from exported archive # @@ -22,6 +23,9 @@ require 'zip' # Files are restored to their original locations and properly attached to records. class Users::ImportData + STREAM_BATCH_SIZE = 1000 + STREAMED_SECTIONS = %w[places visits points].freeze + def initialize(user, archive_path) @user = user @archive_path = archive_path @@ -46,10 +50,7 @@ class Users::ImportData ActiveRecord::Base.transaction do extract_archive - data = load_json_data - - import_in_correct_order(data) - + process_archive_data create_success_notification @import_stats @@ -73,14 +74,10 @@ class Users::ImportData zip_file.each do |entry| next if entry.directory? - # Sanitize entry name to prevent path traversal attacks sanitized_name = sanitize_zip_entry_name(entry.name) next if sanitized_name.nil? - # Compute absolute destination path extraction_path = File.expand_path(File.join(@import_directory, sanitized_name)) - - # Verify the extraction path is within the import directory safe_import_dir = File.expand_path(@import_directory) + File::SEPARATOR unless extraction_path.start_with?(safe_import_dir) || extraction_path == File.expand_path(@import_directory) Rails.logger.warn "Skipping potentially malicious ZIP entry: #{entry.name} (would extract to #{extraction_path})" @@ -90,24 +87,19 @@ class Users::ImportData Rails.logger.debug "Extracting #{entry.name} to #{extraction_path}" FileUtils.mkdir_p(File.dirname(extraction_path)) - - # Use destination_directory parameter for rubyzip 3.x compatibility entry.extract(sanitized_name, destination_directory: @import_directory) end end end def sanitize_zip_entry_name(entry_name) - # Remove leading slashes, backslashes, and dots sanitized = entry_name.gsub(%r{^[/\\]+}, '') - # Reject entries with path traversal attempts if sanitized.include?('..') || sanitized.start_with?('/') || sanitized.start_with?('\\') Rails.logger.warn "Rejecting potentially malicious ZIP entry name: #{entry_name}" return nil end - # Reject absolute paths if Pathname.new(sanitized).absolute? Rails.logger.warn "Rejecting absolute path in ZIP entry: #{entry_name}" return nil @@ -116,52 +108,186 @@ class Users::ImportData sanitized end - def load_json_data - json_path = @import_directory.join('data.json') - - unless File.exist?(json_path) - raise StandardError, "Data file not found in archive: data.json" - end - - JSON.parse(File.read(json_path)) - rescue JSON::ParserError => e - raise StandardError, "Invalid JSON format in data file: #{e.message}" - end - - def import_in_correct_order(data) + def process_archive_data Rails.logger.info "Starting data import for user: #{user.email}" - if data['counts'] - Rails.logger.info "Expected entity counts from export: #{data['counts']}" + json_path = @import_directory.join('data.json') + unless File.exist?(json_path) + raise StandardError, 'Data file not found in archive: data.json' end - Rails.logger.debug "Available data keys: #{data.keys.inspect}" + initialize_stream_state + handler = ::JsonStreamHandler.new(self) + parser = Oj::Parser.new(:saj, handler: handler) - import_settings(data['settings']) if data['settings'] - import_areas(data['areas']) if data['areas'] - - # Import places first to ensure they're available for visits - places_imported = import_places(data['places']) if data['places'] - Rails.logger.info "Places import phase completed: #{places_imported} places imported" - - import_imports(data['imports']) if data['imports'] - import_exports(data['exports']) if data['exports'] - import_trips(data['trips']) if data['trips'] - import_stats(data['stats']) if data['stats'] - import_notifications(data['notifications']) if data['notifications'] - - # Import visits after places to ensure proper place resolution - visits_imported = import_visits(data['visits']) if data['visits'] - Rails.logger.info "Visits import phase completed: #{visits_imported} visits imported" - - import_points(data['points']) if data['points'] - - # Final validation check - if data['counts'] - validate_import_completeness(data['counts']) + File.open(json_path, 'rb') do |io| + parser.load(io) end + finalize_stream_processing + rescue Oj::ParseError => e + raise StandardError, "Invalid JSON format in data file: #{e.message}" + rescue IOError => e + raise StandardError, "Failed to read JSON data: #{e.message}" + end + + def initialize_stream_state + @expected_counts = nil + @places_batch = [] + @stream_writers = {} + @stream_temp_paths = {} + end + + def finalize_stream_processing + flush_places_batch + close_stream_writer(:visits) + close_stream_writer(:points) + + process_visits_stream + process_points_stream + Rails.logger.info "Data import completed. Stats: #{@import_stats}" + + validate_import_completeness(@expected_counts) if @expected_counts.present? + end + + def handle_section(key, value) + case key + when 'counts' + @expected_counts = value if value.is_a?(Hash) + Rails.logger.info "Expected entity counts from export: #{@expected_counts}" if @expected_counts + when 'settings' + import_settings(value) if value.present? + when 'areas' + import_areas(value) + when 'imports' + import_imports(value) + when 'exports' + import_exports(value) + when 'trips' + import_trips(value) + when 'stats' + import_stats(value) + when 'notifications' + import_notifications(value) + else + Rails.logger.debug "Unhandled non-stream section #{key}" unless STREAMED_SECTIONS.include?(key) + end + end + + def handle_stream_value(section, value) + case section + when 'places' + queue_place_for_import(value) + when 'visits' + append_to_stream(:visits, value) + when 'points' + append_to_stream(:points, value) + else + Rails.logger.debug "Received stream value for unknown section #{section}" + end + end + + def finish_stream(section) + case section + when 'places' + flush_places_batch + when 'visits' + close_stream_writer(:visits) + when 'points' + close_stream_writer(:points) + end + end + + def queue_place_for_import(place_data) + return unless place_data.is_a?(Hash) + + @places_batch << place_data + if @places_batch.size >= STREAM_BATCH_SIZE + import_places_batch(@places_batch) + @places_batch.clear + end + end + + def flush_places_batch + return if @places_batch.blank? + + import_places_batch(@places_batch) + @places_batch.clear + end + + def import_places_batch(batch) + Rails.logger.debug "Importing places batch of size #{batch.size}" + places_created = Users::ImportData::Places.new(user, batch.dup).call.to_i + @import_stats[:places_created] += places_created + end + + def append_to_stream(section, value) + return unless value + + writer = stream_writer(section) + writer.puts(Oj.dump(value, mode: :compat)) + end + + def stream_writer(section) + @stream_writers[section] ||= begin + path = stream_temp_path(section) + Rails.logger.debug "Creating stream buffer for #{section} at #{path}" + File.open(path, 'w') + end + end + + def stream_temp_path(section) + @stream_temp_paths[section] ||= @import_directory.join("stream_#{section}.ndjson") + end + + def close_stream_writer(section) + @stream_writers[section]&.close + ensure + @stream_writers.delete(section) + end + + def process_visits_stream + path = @stream_temp_paths[:visits] + return unless path&.exist? + + Rails.logger.info 'Importing visits from streamed buffer' + + batch = [] + File.foreach(path) do |line| + line = line.strip + next if line.blank? + + batch << Oj.load(line) + if batch.size >= STREAM_BATCH_SIZE + import_visits_batch(batch) + batch = [] + end + end + + import_visits_batch(batch) if batch.any? + end + + def import_visits_batch(batch) + visits_created = Users::ImportData::Visits.new(user, batch).call.to_i + @import_stats[:visits_created] += visits_created + end + + def process_points_stream + path = @stream_temp_paths[:points] + return unless path&.exist? + + Rails.logger.info 'Importing points from streamed buffer' + + importer = Users::ImportData::Points.new(user, nil, batch_size: STREAM_BATCH_SIZE) + File.foreach(path) do |line| + line = line.strip + next if line.blank? + + importer.add(Oj.load(line)) + end + + @import_stats[:points_created] = importer.finalize.to_i end def import_settings(settings_data) @@ -172,67 +298,40 @@ class Users::ImportData def import_areas(areas_data) Rails.logger.debug "Importing #{areas_data&.size || 0} areas" - areas_created = Users::ImportData::Areas.new(user, areas_data).call - @import_stats[:areas_created] = areas_created - end - - def import_places(places_data) - Rails.logger.debug "Importing #{places_data&.size || 0} places" - places_created = Users::ImportData::Places.new(user, places_data).call - @import_stats[:places_created] = places_created - places_created + areas_created = Users::ImportData::Areas.new(user, areas_data).call.to_i + @import_stats[:areas_created] += areas_created end def import_imports(imports_data) Rails.logger.debug "Importing #{imports_data&.size || 0} imports" imports_created, files_restored = Users::ImportData::Imports.new(user, imports_data, @import_directory.join('files')).call - @import_stats[:imports_created] = imports_created - @import_stats[:files_restored] += files_restored + @import_stats[:imports_created] += imports_created.to_i + @import_stats[:files_restored] += files_restored.to_i end def import_exports(exports_data) Rails.logger.debug "Importing #{exports_data&.size || 0} exports" exports_created, files_restored = Users::ImportData::Exports.new(user, exports_data, @import_directory.join('files')).call - @import_stats[:exports_created] = exports_created - @import_stats[:files_restored] += files_restored + @import_stats[:exports_created] += exports_created.to_i + @import_stats[:files_restored] += files_restored.to_i end def import_trips(trips_data) Rails.logger.debug "Importing #{trips_data&.size || 0} trips" - trips_created = Users::ImportData::Trips.new(user, trips_data).call - @import_stats[:trips_created] = trips_created + trips_created = Users::ImportData::Trips.new(user, trips_data).call.to_i + @import_stats[:trips_created] += trips_created end def import_stats(stats_data) Rails.logger.debug "Importing #{stats_data&.size || 0} stats" - stats_created = Users::ImportData::Stats.new(user, stats_data).call - @import_stats[:stats_created] = stats_created + stats_created = Users::ImportData::Stats.new(user, stats_data).call.to_i + @import_stats[:stats_created] += stats_created end def import_notifications(notifications_data) Rails.logger.debug "Importing #{notifications_data&.size || 0} notifications" - notifications_created = Users::ImportData::Notifications.new(user, notifications_data).call - @import_stats[:notifications_created] = notifications_created - end - - def import_visits(visits_data) - Rails.logger.debug "Importing #{visits_data&.size || 0} visits" - visits_created = Users::ImportData::Visits.new(user, visits_data).call - @import_stats[:visits_created] = visits_created - visits_created - end - - def import_points(points_data) - Rails.logger.info "About to import #{points_data&.size || 0} points" - - begin - points_created = Users::ImportData::Points.new(user, points_data).call - - @import_stats[:points_created] = points_created - rescue StandardError => e - ExceptionReporter.call(e, 'Points import failed') - @import_stats[:points_created] = 0 - end + notifications_created = Users::ImportData::Notifications.new(user, notifications_data).call.to_i + @import_stats[:notifications_created] += notifications_created end def cleanup_temporary_files(import_directory) @@ -246,15 +345,15 @@ class Users::ImportData def create_success_notification summary = "#{@import_stats[:points_created]} points, " \ - "#{@import_stats[:visits_created]} visits, " \ - "#{@import_stats[:places_created]} places, " \ - "#{@import_stats[:trips_created]} trips, " \ - "#{@import_stats[:areas_created]} areas, " \ - "#{@import_stats[:imports_created]} imports, " \ - "#{@import_stats[:exports_created]} exports, " \ - "#{@import_stats[:stats_created]} stats, " \ - "#{@import_stats[:files_restored]} files restored, " \ - "#{@import_stats[:notifications_created]} notifications" + "#{@import_stats[:visits_created]} visits, " \ + "#{@import_stats[:places_created]} places, " \ + "#{@import_stats[:trips_created]} trips, " \ + "#{@import_stats[:areas_created]} areas, " \ + "#{@import_stats[:imports_created]} imports, " \ + "#{@import_stats[:exports_created]} exports, " \ + "#{@import_stats[:stats_created]} stats, " \ + "#{@import_stats[:files_restored]} files restored, " \ + "#{@import_stats[:notifications_created]} notifications" ::Notifications::Create.new( user: user, @@ -274,7 +373,7 @@ class Users::ImportData end def validate_import_completeness(expected_counts) - Rails.logger.info "Validating import completeness..." + Rails.logger.info 'Validating import completeness...' discrepancies = [] @@ -291,7 +390,7 @@ class Users::ImportData if discrepancies.any? Rails.logger.warn "Import completed with discrepancies: #{discrepancies.join(', ')}" else - Rails.logger.info "Import validation successful - all entities imported correctly" + Rails.logger.info 'Import validation successful - all entities imported correctly' end end end diff --git a/app/services/users/import_data/places.rb b/app/services/users/import_data/places.rb index 6d4ed023..bacfd335 100644 --- a/app/services/users/import_data/places.rb +++ b/app/services/users/import_data/places.rb @@ -1,32 +1,71 @@ # frozen_string_literal: true class Users::ImportData::Places - def initialize(user, places_data) + BATCH_SIZE = 1000 + + def initialize(user, places_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger) @user = user @places_data = places_data + @batch_size = batch_size + @logger = logger + @buffer = [] + @created = 0 end def call - return 0 unless places_data.is_a?(Array) + return 0 unless places_data.respond_to?(:each) - Rails.logger.info "Importing #{places_data.size} places for user: #{user.email}" + logger.info "Importing #{collection_description(places_data)} places for user: #{user.email}" - places_created = 0 - - places_data.each do |place_data| - next unless place_data.is_a?(Hash) - - place = find_or_create_place_for_import(place_data) - places_created += 1 if place&.respond_to?(:previously_new_record?) && place.previously_new_record? + enumerate(places_data) do |place_data| + add(place_data) end - Rails.logger.info "Places import completed. Created: #{places_created}" - places_created + finalize + end + + def add(place_data) + return unless place_data.is_a?(Hash) + + @buffer << place_data + flush_batch if @buffer.size >= batch_size + end + + def finalize + flush_batch + logger.info "Places import completed. Created: #{@created}" + @created end private - attr_reader :user, :places_data + attr_reader :user, :places_data, :batch_size, :logger + + def enumerate(collection, &block) + if collection.is_a?(Array) + collection.each(&block) + else + collection.each(&block) + end + end + + def collection_description(collection) + return collection.size if collection.respond_to?(:size) + + 'streamed' + end + + def flush_batch + return if @buffer.empty? + + logger.debug "Processing places batch of #{@buffer.size}" + @buffer.each do |place_data| + place = find_or_create_place_for_import(place_data) + @created += 1 if place&.respond_to?(:previously_new_record?) && place.previously_new_record? + end + + @buffer.clear + end def find_or_create_place_for_import(place_data) name = place_data['name'] @@ -34,14 +73,12 @@ class Users::ImportData::Places longitude = place_data['longitude']&.to_f unless name.present? && latitude.present? && longitude.present? - Rails.logger.debug "Skipping place with missing required data: #{place_data.inspect}" + logger.debug "Skipping place with missing required data: #{place_data.inspect}" return nil end - Rails.logger.debug "Processing place for import: #{name} at (#{latitude}, #{longitude})" + logger.debug "Processing place for import: #{name} at (#{latitude}, #{longitude})" - # During import, we prioritize data integrity for the importing user - # First try exact match (name + coordinates) existing_place = Place.where( name: name, latitude: latitude, @@ -49,31 +86,29 @@ class Users::ImportData::Places ).first if existing_place - Rails.logger.debug "Found exact place match: #{name} at (#{latitude}, #{longitude}) -> existing place ID #{existing_place.id}" + logger.debug "Found exact place match: #{name} at (#{latitude}, #{longitude}) -> existing place ID #{existing_place.id}" existing_place.define_singleton_method(:previously_new_record?) { false } return existing_place end - Rails.logger.debug "No exact match found for #{name} at (#{latitude}, #{longitude}). Creating new place." + logger.debug "No exact match found for #{name} at (#{latitude}, #{longitude}). Creating new place." - # If no exact match, create a new place to ensure data integrity - # This prevents data loss during import even if similar places exist place_attributes = place_data.except('created_at', 'updated_at', 'latitude', 'longitude') place_attributes['lonlat'] = "POINT(#{longitude} #{latitude})" place_attributes['latitude'] = latitude place_attributes['longitude'] = longitude place_attributes.delete('user') - Rails.logger.debug "Creating place with attributes: #{place_attributes.inspect}" + logger.debug "Creating place with attributes: #{place_attributes.inspect}" begin place = Place.create!(place_attributes) place.define_singleton_method(:previously_new_record?) { true } - Rails.logger.debug "Created place during import: #{place.name} (ID: #{place.id})" + logger.debug "Created place during import: #{place.name} (ID: #{place.id})" place rescue ActiveRecord::RecordInvalid => e - Rails.logger.error "Failed to create place: #{place_data.inspect}, error: #{e.message}" + logger.error "Failed to create place: #{place_data.inspect}, error: #{e.message}" nil end end diff --git a/app/services/users/import_data/points.rb b/app/services/users/import_data/points.rb index c0c6139d..c54ab1e1 100644 --- a/app/services/users/import_data/points.rb +++ b/app/services/users/import_data/points.rb @@ -1,96 +1,167 @@ # frozen_string_literal: true +require 'time' + class Users::ImportData::Points BATCH_SIZE = 1000 - def initialize(user, points_data) + def initialize(user, points_data = nil, batch_size: BATCH_SIZE, logger: Rails.logger) @user = user @points_data = points_data + @batch_size = batch_size + @logger = logger + + @buffer = [] + @total_created = 0 + @processed_count = 0 + @skipped_count = 0 + @preloaded = false + + @imports_lookup = {} + @countries_lookup = {} + @visits_lookup = {} end def call - return 0 unless points_data.is_a?(Array) + return 0 unless points_data.respond_to?(:each) - Rails.logger.info "Importing #{points_data.size} points for user: #{user.email}" - Rails.logger.debug "First point sample: #{points_data.first.inspect}" + logger.info "Importing #{collection_description(points_data)} points for user: #{user.email}" - preload_reference_data - - valid_points = filter_and_prepare_points - - if valid_points.empty? - Rails.logger.warn "No valid points to import after filtering" - Rails.logger.debug "Original points_data size: #{points_data.size}" - return 0 + enumerate(points_data) do |point_data| + add(point_data) end - deduplicated_points = deduplicate_points(valid_points) + finalize + end - Rails.logger.info "Prepared #{deduplicated_points.size} unique valid points (#{points_data.size - deduplicated_points.size} duplicates/invalid skipped)" + # Allows streamed usage by pushing a single point at a time. + def add(point_data) + preload_reference_data unless @preloaded - total_created = bulk_import_points(deduplicated_points) + if valid_point_data?(point_data) + prepared_attributes = prepare_point_attributes(point_data) - Rails.logger.info "Points import completed. Created: #{total_created}" - total_created + if prepared_attributes + @buffer << prepared_attributes + @processed_count += 1 + + flush_batch if @buffer.size >= batch_size + else + @skipped_count += 1 + end + else + @skipped_count += 1 + logger.debug "Skipped point: invalid data - #{point_data.inspect}" + end + end + + def finalize + preload_reference_data unless @preloaded + flush_batch + + logger.info "Points import completed. Created: #{@total_created}. Processed #{@processed_count} valid points, skipped #{@skipped_count}." + @total_created end private - attr_reader :user, :points_data, :imports_lookup, :countries_lookup, :visits_lookup + attr_reader :user, :points_data, :batch_size, :logger, :imports_lookup, :countries_lookup, :visits_lookup + + def enumerate(collection, &block) + if collection.is_a?(Array) + collection.each(&block) + else + collection.each(&block) + end + end + + def collection_description(collection) + return collection.size if collection.respond_to?(:size) + + 'streamed' + end + + def flush_batch + return if @buffer.empty? + + logger.debug "Processing batch of #{@buffer.size} points" + logger.debug "First point in batch: #{@buffer.first.inspect}" + + normalized_batch = normalize_point_keys(@buffer) + + begin + result = Point.upsert_all( + normalized_batch, + unique_by: %i[lonlat timestamp user_id], + returning: %w[id], + on_duplicate: :skip + ) + + batch_created = result&.count.to_i + @total_created += batch_created + + logger.debug "Processed batch of #{@buffer.size} points, created #{batch_created}, total created: #{@total_created}" + rescue StandardError => e + logger.error "Failed to process point batch: #{e.message}" + logger.error "Batch size: #{@buffer.size}" + logger.error "First point in failed batch: #{@buffer.first.inspect}" + logger.error "Backtrace: #{e.backtrace.first(5).join('\n')}" + ensure + @buffer.clear + end + end def preload_reference_data + return if @preloaded + + logger.debug 'Preloading reference data for points import' + @imports_lookup = {} - user.imports.each do |import| + user.imports.reload.each do |import| string_key = [import.name, import.source, import.created_at.utc.iso8601] integer_key = [import.name, Import.sources[import.source], import.created_at.utc.iso8601] @imports_lookup[string_key] = import @imports_lookup[integer_key] = import end - Rails.logger.debug "Loaded #{user.imports.size} imports with #{@imports_lookup.size} lookup keys" + logger.debug "Loaded #{user.imports.size} imports with #{@imports_lookup.size} lookup keys" @countries_lookup = {} Country.all.each do |country| @countries_lookup[[country.name, country.iso_a2, country.iso_a3]] = country @countries_lookup[country.name] = country end - Rails.logger.debug "Loaded #{Country.count} countries for lookup" + logger.debug "Loaded #{Country.count} countries for lookup" - @visits_lookup = user.visits.index_by { |visit| + @visits_lookup = user.visits.reload.index_by do |visit| [visit.name, visit.started_at.utc.iso8601, visit.ended_at.utc.iso8601] - } - Rails.logger.debug "Loaded #{@visits_lookup.size} visits for lookup" + end + logger.debug "Loaded #{@visits_lookup.size} visits for lookup" + + @preloaded = true end - def filter_and_prepare_points - valid_points = [] - skipped_count = 0 + def normalize_point_keys(points) + all_keys = points.flat_map(&:keys).uniq - points_data.each_with_index do |point_data, index| - next unless point_data.is_a?(Hash) - - unless valid_point_data?(point_data) - skipped_count += 1 - Rails.logger.debug "Skipped point #{index}: invalid data - #{point_data.slice('timestamp', 'longitude', 'latitude', 'lonlat')}" - next + points.map do |point| + all_keys.each_with_object({}) do |key, normalized| + normalized[key] = point[key] end - - prepared_attributes = prepare_point_attributes(point_data) - unless prepared_attributes - skipped_count += 1 - Rails.logger.debug "Skipped point #{index}: failed to prepare attributes" - next - end - - valid_points << prepared_attributes end + end - if skipped_count > 0 - Rails.logger.warn "Skipped #{skipped_count} points with invalid or missing required data" - end + def valid_point_data?(point_data) + return false unless point_data.is_a?(Hash) + return false unless point_data['timestamp'].present? - Rails.logger.debug "Filtered #{valid_points.size} valid points from #{points_data.size} total" - valid_points + has_lonlat = point_data['lonlat'].present? && point_data['lonlat'].is_a?(String) && point_data['lonlat'].start_with?('POINT(') + has_coordinates = point_data['longitude'].present? && point_data['latitude'].present? + + has_lonlat || has_coordinates + rescue StandardError => e + logger.debug "Point validation failed: #{e.message} for data: #{point_data.inspect}" + false end def prepare_point_attributes(point_data) @@ -118,15 +189,14 @@ class Users::ImportData::Points result = attributes.symbolize_keys - Rails.logger.debug "Prepared point attributes: #{result.slice(:lonlat, :timestamp, :import_id, :country_id, :visit_id)}" + logger.debug "Prepared point attributes: #{result.slice(:lonlat, :timestamp, :import_id, :country_id, :visit_id)}" result rescue StandardError => e ExceptionReporter.call(e, 'Failed to prepare point attributes') - nil end - def resolve_import_reference(attributes, import_reference) + def resolve_import_reference(attributes, import_reference) return unless import_reference.is_a?(Hash) created_at = normalize_timestamp_for_lookup(import_reference['created_at']) @@ -140,10 +210,10 @@ class Users::ImportData::Points import = imports_lookup[import_key] if import attributes['import_id'] = import.id - Rails.logger.debug "Resolved import reference: #{import_reference['name']} -> #{import.id}" + logger.debug "Resolved import reference: #{import_reference['name']} -> #{import.id}" else - Rails.logger.debug "Import not found for reference: #{import_reference.inspect}" - Rails.logger.debug "Available imports: #{imports_lookup.keys.inspect}" + logger.debug "Import not found for reference: #{import_reference.inspect}" + logger.debug "Available imports: #{imports_lookup.keys.inspect}" end end @@ -159,14 +229,12 @@ class Users::ImportData::Points if country attributes['country_id'] = country.id - Rails.logger.debug "Resolved country reference: #{country_info['name']} -> #{country.id}" + logger.debug "Resolved country reference: #{country_info['name']} -> #{country.id}" else - Rails.logger.debug "Country not found for: #{country_info.inspect}" + logger.debug "Country not found for: #{country_info.inspect}" end end - - def resolve_visit_reference(attributes, visit_reference) return unless visit_reference.is_a?(Hash) @@ -182,84 +250,19 @@ class Users::ImportData::Points visit = visits_lookup[visit_key] if visit attributes['visit_id'] = visit.id - Rails.logger.debug "Resolved visit reference: #{visit_reference['name']} -> #{visit.id}" + logger.debug "Resolved visit reference: #{visit_reference['name']} -> #{visit.id}" else - Rails.logger.debug "Visit not found for reference: #{visit_reference.inspect}" - Rails.logger.debug "Available visits: #{visits_lookup.keys.inspect}" + logger.debug "Visit not found for reference: #{visit_reference.inspect}" + logger.debug "Available visits: #{visits_lookup.keys.inspect}" end end - def deduplicate_points(points) - points.uniq { |point| [point[:lonlat], point[:timestamp], point[:user_id]] } - end - - def normalize_point_keys(points) - all_keys = points.flat_map(&:keys).uniq - - # Normalize each point to have all keys (with nil for missing ones) - points.map do |point| - normalized = {} - all_keys.each do |key| - normalized[key] = point[key] - end - normalized - end - end - - def bulk_import_points(points) - total_created = 0 - - points.each_slice(BATCH_SIZE) do |batch| - begin - Rails.logger.debug "Processing batch of #{batch.size} points" - Rails.logger.debug "First point in batch: #{batch.first.inspect}" - - normalized_batch = normalize_point_keys(batch) - - result = Point.upsert_all( - normalized_batch, - unique_by: %i[lonlat timestamp user_id], - returning: %w[id], - on_duplicate: :skip - ) - - batch_created = result.count - total_created += batch_created - - Rails.logger.debug "Processed batch of #{batch.size} points, created #{batch_created}, total created: #{total_created}" - - rescue StandardError => e - Rails.logger.error "Failed to process point batch: #{e.message}" - Rails.logger.error "Batch size: #{batch.size}" - Rails.logger.error "First point in failed batch: #{batch.first.inspect}" - Rails.logger.error "Backtrace: #{e.backtrace.first(5).join('\n')}" - end - end - - total_created - end - - def valid_point_data?(point_data) - return false unless point_data.is_a?(Hash) - return false unless point_data['timestamp'].present? - - has_lonlat = point_data['lonlat'].present? && point_data['lonlat'].is_a?(String) && point_data['lonlat'].start_with?('POINT(') - has_coordinates = point_data['longitude'].present? && point_data['latitude'].present? - - return false unless has_lonlat || has_coordinates - - true - rescue StandardError => e - Rails.logger.debug "Point validation failed: #{e.message} for data: #{point_data.inspect}" - false - end - def ensure_lonlat_field(attributes, point_data) if attributes['lonlat'].blank? && point_data['longitude'].present? && point_data['latitude'].present? longitude = point_data['longitude'].to_f latitude = point_data['latitude'].to_f attributes['lonlat'] = "POINT(#{longitude} #{latitude})" - Rails.logger.debug "Reconstructed lonlat: #{attributes['lonlat']}" + logger.debug "Reconstructed lonlat: #{attributes['lonlat']}" end end @@ -275,7 +278,7 @@ class Users::ImportData::Points timestamp.to_s end rescue StandardError => e - Rails.logger.debug "Failed to normalize timestamp #{timestamp}: #{e.message}" + logger.debug "Failed to normalize timestamp #{timestamp}: #{e.message}" timestamp.to_s end end diff --git a/docs/import_optimization_plan.md b/docs/import_optimization_plan.md new file mode 100644 index 00000000..1cc1a7c5 --- /dev/null +++ b/docs/import_optimization_plan.md @@ -0,0 +1,36 @@ +# Import Optimisation Plan + +## Goals +- Prevent large imports from exhausting memory or hitting IO limits while reading export archives. +- Maintain correctness and ordering guarantees for all imported entities. +- Preserve observability and operability (clear errors and actionable logs). + +## Current Status +- ✅ Replaced `File.read + JSON.parse` with streaming via `Oj::Parser(:saj).load`, so `data.json` is consumed in 16KB chunks instead of loading the whole file. +- ✅ `Users::ImportData` now dispatches streamed payloads section-by-section, buffering `places` in-memory batches and spilling `visits`/`points` to NDJSON for replay after dependencies are ready. +- ✅ Points, places, and visits importers support incremental ingestion with a fixed batch size of 1,000 records and detailed progress logs. +- ✅ Added targeted specs for the SAJ handler and streaming flow; addressed IO retry messaging. +- ⚙️ Pending: archive-size guardrails, broader telemetry, and production rollout validation. + +## Remaining Pain Points +- No preflight check yet for extreme `data.json` sizes or malformed streams. +- Logging only (no metrics/dashboards) for monitoring batch throughput and failures. + +## Next Steps +1. **Rollout & Hardening** + - Add size/structure validation before streaming (fail fast with actionable error). + - Extend log coverage (import durations, batch counts) and document operator playbook. + - Capture memory/runtime snapshots during large staged imports. +2. **Post-Rollout Validation** + - Re-run the previously failing Sidekiq job (import 105) under the new pipeline. + - Monitor Sidekiq memory and throughput; tune batch size if needed. + - Gather feedback and decide on export format split or further streaming tweaks. + +## Validation Strategy +- Automated: streaming parser specs, importer batch tests, service integration spec (already in place; expand as new safeguards land). +- Manual: stage large imports, inspect Sidekiq logs/metrics once added, confirm notifications, stats, and files restored. + +## Open Questions +- What thresholds should trigger preflight failures or warnings (file size, record counts)? +- Do we need structured metrics beyond logs for long-running imports? +- Should we pursue export format splitting or incremental resume once streaming rollout is stable? diff --git a/lib/json_stream_handler.rb b/lib/json_stream_handler.rb new file mode 100644 index 00000000..f9e6b372 --- /dev/null +++ b/lib/json_stream_handler.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +# Streaming JSON handler relays sections and streamed values back to the importer instance. + +class JsonStreamHandler < Oj::Saj + HashState = Struct.new(:hash, :root, :key) + ArrayState = Struct.new(:array, :key) + StreamState = Struct.new(:key) + + def initialize(processor) + super() + @processor = processor + @stack = [] + end + + def hash_start(key = nil, *_) + state = HashState.new({}, @stack.empty?, normalize_key(key)) + @stack << state + end + + def hash_end(key = nil, *_) + state = @stack.pop + value = state.hash + parent = @stack.last + + dispatch_to_parent(parent, value, normalize_key(key) || state.key) + end + + def array_start(key = nil, *_) + normalized_key = normalize_key(key) + parent = @stack.last + + if parent.is_a?(HashState) && parent.root && @stack.size == 1 && Users::ImportData::STREAMED_SECTIONS.include?(normalized_key) + @stack << StreamState.new(normalized_key) + else + @stack << ArrayState.new([], normalized_key) + end + end + + def array_end(key = nil, *_) + state = @stack.pop + case state + when StreamState + @processor.send(:finish_stream, state.key) + when ArrayState + value = state.array + parent = @stack.last + dispatch_to_parent(parent, value, normalize_key(key) || state.key) + end + end + + def add_value(value, key) + parent = @stack.last + dispatch_to_parent(parent, value, normalize_key(key)) + end + + private + + def normalize_key(key) + key&.to_s + end + + def dispatch_to_parent(parent, value, key) + return unless parent + + case parent + when HashState + if parent.root && @stack.size == 1 + @processor.send(:handle_section, key, value) + else + parent.hash[key] = value + end + when ArrayState + parent.array << value + when StreamState + @processor.send(:handle_stream_value, parent.key, value) + end + end +end diff --git a/spec/services/users/import_data/json_stream_handler_spec.rb b/spec/services/users/import_data/json_stream_handler_spec.rb new file mode 100644 index 00000000..3354d8a5 --- /dev/null +++ b/spec/services/users/import_data/json_stream_handler_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'oj' + +RSpec.describe JsonStreamHandler do + let(:processor) { double('StreamProcessor') } + let(:handler) { described_class.new(processor) } + + before do + allow(processor).to receive(:handle_section) + allow(processor).to receive(:handle_stream_value) + allow(processor).to receive(:finish_stream) + end + + it 'streams configured sections and delegates other values immediately' do + payload = { + 'counts' => { 'places' => 2, 'visits' => 1, 'points' => 1 }, + 'settings' => { 'theme' => 'dark' }, + 'areas' => [{ 'name' => 'Home' }], + 'places' => [ + { 'name' => 'Cafe', 'latitude' => 1.0, 'longitude' => 2.0 }, + { 'name' => 'Library', 'latitude' => 3.0, 'longitude' => 4.0 } + ], + 'visits' => [ + { + 'name' => 'Morning Coffee', + 'started_at' => '2025-01-01T09:00:00Z', + 'ended_at' => '2025-01-01T10:00:00Z' + } + ], + 'points' => [ + { 'timestamp' => 1, 'lonlat' => 'POINT(2 1)' } + ] + } + + Oj.saj_parse(handler, Oj.dump(payload, mode: :compat)) + + expect(processor).to have_received(:handle_section).with('counts', hash_including('places' => 2)) + expect(processor).to have_received(:handle_section).with('settings', hash_including('theme' => 'dark')) + expect(processor).to have_received(:handle_section).with('areas', [hash_including('name' => 'Home')]) + + expect(processor).to have_received(:handle_stream_value).with('places', hash_including('name' => 'Cafe')) + expect(processor).to have_received(:handle_stream_value).with('places', hash_including('name' => 'Library')) + expect(processor).to have_received(:handle_stream_value).with('visits', hash_including('name' => 'Morning Coffee')) + expect(processor).to have_received(:handle_stream_value).with('points', hash_including('timestamp' => 1)) + + expect(processor).to have_received(:finish_stream).with('places') + expect(processor).to have_received(:finish_stream).with('visits') + expect(processor).to have_received(:finish_stream).with('points') + + expect(processor).not_to have_received(:handle_section).with('places', anything) + expect(processor).not_to have_received(:handle_section).with('visits', anything) + expect(processor).not_to have_received(:handle_section).with('points', anything) + end +end diff --git a/spec/services/users/import_data_spec.rb b/spec/services/users/import_data_spec.rb index 1fcf9cfd..2a660724 100644 --- a/spec/services/users/import_data_spec.rb +++ b/spec/services/users/import_data_spec.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true require 'rails_helper' +require 'tmpdir' +require 'oj' RSpec.describe Users::ImportData, type: :service do let(:user) { create(:user) } @@ -12,122 +14,59 @@ RSpec.describe Users::ImportData, type: :service do allow(Time).to receive(:current).and_return(Time.zone.at(1234567890)) allow(FileUtils).to receive(:mkdir_p) allow(FileUtils).to receive(:rm_rf) - allow(File).to receive(:directory?).and_return(true) + allow_any_instance_of(Pathname).to receive(:exist?).and_return(true) end describe '#import' do - let(:sample_data) do - { - 'counts' => { - 'areas' => 2, - 'places' => 3, - 'imports' => 1, - 'exports' => 1, - 'trips' => 2, - 'stats' => 1, - 'notifications' => 2, - 'visits' => 4, - 'points' => 1000 - }, - 'settings' => { 'theme' => 'dark' }, - 'areas' => [{ 'name' => 'Home', 'latitude' => '40.7128', 'longitude' => '-74.0060' }], - 'places' => [{ 'name' => 'Office', 'latitude' => '40.7589', 'longitude' => '-73.9851' }], - 'imports' => [{ 'name' => 'test.json', 'source' => 'owntracks' }], - 'exports' => [{ 'name' => 'export.json', 'status' => 'completed' }], - 'trips' => [{ 'name' => 'Trip to NYC', 'distance' => 100.5 }], - 'stats' => [{ 'year' => 2024, 'month' => 1, 'distance' => 456.78 }], - 'notifications' => [{ 'title' => 'Test', 'content' => 'Test notification' }], - 'visits' => [{ 'name' => 'Work Visit', 'duration' => 3600 }], - 'points' => [{ 'latitude' => 40.7128, 'longitude' => -74.0060, 'timestamp' => 1234567890 }] - } - end + let(:notification_double) { instance_double(::Notifications::Create, call: true) } before do - # Mock ZIP file extraction - zipfile_mock = double('ZipFile') - allow(zipfile_mock).to receive(:each) - allow(Zip::File).to receive(:open).with(archive_path).and_yield(zipfile_mock) - - # Mock JSON loading and File operations - allow(File).to receive(:exist?).and_return(false) - allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true) - allow(File).to receive(:read).with(import_directory.join('data.json')).and_return(sample_data.to_json) - - # Mock all import services - allow(Users::ImportData::Settings).to receive(:new).and_return(double(call: true)) - allow(Users::ImportData::Areas).to receive(:new).and_return(double(call: 2)) - allow(Users::ImportData::Places).to receive(:new).and_return(double(call: 3)) - allow(Users::ImportData::Imports).to receive(:new).and_return(double(call: [1, 5])) - allow(Users::ImportData::Exports).to receive(:new).and_return(double(call: [1, 2])) - allow(Users::ImportData::Trips).to receive(:new).and_return(double(call: 2)) - allow(Users::ImportData::Stats).to receive(:new).and_return(double(call: 1)) - allow(Users::ImportData::Notifications).to receive(:new).and_return(double(call: 2)) - allow(Users::ImportData::Visits).to receive(:new).and_return(double(call: 4)) - allow(Users::ImportData::Points).to receive(:new).and_return(double(call: 1000)) - - # Mock notifications - allow(::Notifications::Create).to receive(:new).and_return(double(call: true)) - - # Mock cleanup + allow(::Notifications::Create).to receive(:new).and_return(notification_double) allow(service).to receive(:cleanup_temporary_files) - allow_any_instance_of(Pathname).to receive(:exist?).and_return(true) end - context 'when import is successful' do - it 'creates import directory' do + context 'when import succeeds' do + before do + allow(service).to receive(:extract_archive) + allow(service).to receive(:process_archive_data) do + stats = service.instance_variable_get(:@import_stats) + stats[:settings_updated] = true + stats[:areas_created] = 2 + stats[:places_created] = 3 + stats[:imports_created] = 1 + stats[:exports_created] = 1 + stats[:trips_created] = 2 + stats[:stats_created] = 1 + stats[:notifications_created] = 2 + stats[:visits_created] = 4 + stats[:points_created] = 1000 + stats[:files_restored] = 7 + end + end + + it 'creates the import directory' do expect(FileUtils).to receive(:mkdir_p).with(import_directory) - service.import end - it 'extracts the archive' do - expect(Zip::File).to receive(:open).with(archive_path) - + it 'extracts the archive and processes data' do + expect(service).to receive(:extract_archive).ordered + expect(service).to receive(:process_archive_data).ordered service.import end - it 'loads JSON data from extracted files' do - expect(File).to receive(:exist?).with(import_directory.join('data.json')) - expect(File).to receive(:read).with(import_directory.join('data.json')) - - service.import - end - - it 'calls all import services in correct order' do - expect(Users::ImportData::Settings).to receive(:new).with(user, sample_data['settings']).ordered - expect(Users::ImportData::Areas).to receive(:new).with(user, sample_data['areas']).ordered - expect(Users::ImportData::Places).to receive(:new).with(user, sample_data['places']).ordered - expect(Users::ImportData::Imports).to receive(:new).with(user, sample_data['imports'], import_directory.join('files')).ordered - expect(Users::ImportData::Exports).to receive(:new).with(user, sample_data['exports'], import_directory.join('files')).ordered - expect(Users::ImportData::Trips).to receive(:new).with(user, sample_data['trips']).ordered - expect(Users::ImportData::Stats).to receive(:new).with(user, sample_data['stats']).ordered - expect(Users::ImportData::Notifications).to receive(:new).with(user, sample_data['notifications']).ordered - expect(Users::ImportData::Visits).to receive(:new).with(user, sample_data['visits']).ordered - expect(Users::ImportData::Points).to receive(:new).with(user, sample_data['points']).ordered - - service.import - end - - it 'creates success notification with import stats' do + it 'creates a success notification with summary' do expect(::Notifications::Create).to receive(:new).with( user: user, title: 'Data import completed', - content: match(/1000 points.*4 visits.*3 places.*2 trips/), + content: include('1000 points, 4 visits, 3 places, 2 trips'), kind: :info ) - - service.import - end - - it 'cleans up temporary files' do - expect(service).to receive(:cleanup_temporary_files).with(import_directory) - service.import end it 'returns import statistics' do result = service.import - expect(result).to include( settings_updated: true, areas_created: 2, @@ -142,53 +81,18 @@ RSpec.describe Users::ImportData, type: :service do files_restored: 7 ) end - - it 'logs expected counts if available' do - allow(Rails.logger).to receive(:info) # Allow other log messages - expect(Rails.logger).to receive(:info).with(/Expected entity counts from export:/) - - service.import - end end - context 'when JSON file is missing' do + context 'when an error happens during processing' do + let(:error_message) { 'boom' } + before do - allow(File).to receive(:exist?).and_return(false) - allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(false) + allow(service).to receive(:extract_archive) + allow(service).to receive(:process_archive_data).and_raise(StandardError, error_message) allow(ExceptionReporter).to receive(:call) end - it 'raises an error' do - expect { service.import }.to raise_error(StandardError, 'Data file not found in archive: data.json') - end - end - - context 'when JSON is invalid' do - before do - allow(File).to receive(:exist?).and_return(false) - allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true) - allow(File).to receive(:read).with(import_directory.join('data.json')).and_return('invalid json') - allow(ExceptionReporter).to receive(:call) - end - - it 'raises a JSON parse error' do - expect { service.import }.to raise_error(StandardError, /Invalid JSON format in data file/) - end - end - - context 'when an error occurs during import' do - let(:error_message) { 'Something went wrong' } - - before do - allow(File).to receive(:exist?).and_return(false) - allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true) - allow(File).to receive(:read).with(import_directory.join('data.json')).and_return(sample_data.to_json) - allow(Users::ImportData::Settings).to receive(:new).and_raise(StandardError, error_message) - allow(ExceptionReporter).to receive(:call) - allow(::Notifications::Create).to receive(:new).and_return(double(call: true)) - end - - it 'creates failure notification' do + it 'creates a failure notification and re-raises the error' do expect(::Notifications::Create).to receive(:new).with( user: user, title: 'Data import failed', @@ -198,100 +102,99 @@ RSpec.describe Users::ImportData, type: :service do expect { service.import }.to raise_error(StandardError, error_message) end - - it 'reports error via ExceptionReporter' do - expect(ExceptionReporter).to receive(:call).with( - an_instance_of(StandardError), - 'Data import failed' - ) - - expect { service.import }.to raise_error(StandardError, error_message) - end - - it 'still cleans up temporary files' do - expect(service).to receive(:cleanup_temporary_files) - - expect { service.import }.to raise_error(StandardError, error_message) - end - - it 're-raises the error' do - expect { service.import }.to raise_error(StandardError, error_message) - end - end - - context 'when data sections are missing' do - let(:minimal_data) { { 'settings' => { 'theme' => 'dark' } } } - - before do - # Reset JSON file mocking - allow(File).to receive(:exist?).and_return(false) - allow(File).to receive(:exist?).with(import_directory.join('data.json')).and_return(true) - allow(File).to receive(:read).with(import_directory.join('data.json')).and_return(minimal_data.to_json) - - # Only expect Settings to be called - allow(Users::ImportData::Settings).to receive(:new).and_return(double(call: true)) - allow(::Notifications::Create).to receive(:new).and_return(double(call: true)) - end - - it 'only imports available sections' do - expect(Users::ImportData::Settings).to receive(:new).with(user, minimal_data['settings']) - expect(Users::ImportData::Areas).not_to receive(:new) - expect(Users::ImportData::Places).not_to receive(:new) - - service.import - end end end - describe 'private methods' do - describe '#cleanup_temporary_files' do - context 'when directory exists' do - before do - allow(File).to receive(:directory?).and_return(true) - allow(Rails.logger).to receive(:info) - end + describe '#process_archive_data' do + let(:tmp_dir) { Pathname.new(Dir.mktmpdir) } + let(:json_path) { tmp_dir.join('data.json') } + let(:places_calls) { [] } + let(:visits_batches) { [] } + let(:points_ingested) { [] } + let(:points_importer) do + instance_double(Users::ImportData::Points, add: nil, finalize: 2) + end - it 'removes the directory' do - expect(FileUtils).to receive(:rm_rf).with(import_directory) + before do + payload = { + 'counts' => { 'places' => 2, 'visits' => 2, 'points' => 2 }, + 'settings' => { 'theme' => 'dark' }, + 'areas' => [], + 'imports' => [], + 'exports' => [], + 'trips' => [], + 'stats' => [], + 'notifications' => [], + 'places' => [ + { 'name' => 'Cafe', 'latitude' => 1.0, 'longitude' => 2.0 }, + { 'name' => 'Library', 'latitude' => 3.0, 'longitude' => 4.0 } + ], + 'visits' => [ + { + 'name' => 'Morning Coffee', + 'started_at' => '2025-01-01T09:00:00Z', + 'ended_at' => '2025-01-01T10:00:00Z' + }, + { + 'name' => 'Study Time', + 'started_at' => '2025-01-01T12:00:00Z', + 'ended_at' => '2025-01-01T14:00:00Z' + } + ], + 'points' => [ + { 'timestamp' => 1, 'lonlat' => 'POINT(2 1)' }, + { 'timestamp' => 2, 'lonlat' => 'POINT(4 3)' } + ] + } - service.send(:cleanup_temporary_files, import_directory) - end + File.write(json_path, Oj.dump(payload, mode: :compat)) - it 'logs the cleanup' do - expect(Rails.logger).to receive(:info).with("Cleaning up temporary import directory: #{import_directory}") + service.instance_variable_set(:@import_directory, tmp_dir) - service.send(:cleanup_temporary_files, import_directory) - end + allow(Users::ImportData::Settings).to receive(:new).and_return(double(call: true)) + allow(Users::ImportData::Areas).to receive(:new).and_return(double(call: 0)) + allow(Users::ImportData::Imports).to receive(:new).and_return(double(call: [0, 0])) + allow(Users::ImportData::Exports).to receive(:new).and_return(double(call: [0, 0])) + allow(Users::ImportData::Trips).to receive(:new).and_return(double(call: 0)) + allow(Users::ImportData::Stats).to receive(:new).and_return(double(call: 0)) + allow(Users::ImportData::Notifications).to receive(:new).and_return(double(call: 0)) + + allow(Users::ImportData::Places).to receive(:new) do |_, batch| + places_calls << batch + double(call: batch.size) end - context 'when cleanup fails' do - before do - allow(File).to receive(:directory?).and_return(true) - allow(FileUtils).to receive(:rm_rf).and_raise(StandardError, 'Permission denied') - allow(ExceptionReporter).to receive(:call) - end - - it 'reports error via ExceptionReporter but does not re-raise' do - expect(ExceptionReporter).to receive(:call).with( - an_instance_of(StandardError), - 'Failed to cleanup temporary files' - ) - - expect { service.send(:cleanup_temporary_files, import_directory) }.not_to raise_error - end + allow(Users::ImportData::Visits).to receive(:new) do |_, batch| + visits_batches << batch + double(call: batch.size) end - context 'when directory does not exist' do - before do - allow(File).to receive(:directory?).and_return(false) - end - - it 'does not attempt cleanup' do - expect(FileUtils).not_to receive(:rm_rf) - - service.send(:cleanup_temporary_files, import_directory) - end + allow(points_importer).to receive(:add) do |point| + points_ingested << point end + + allow(Users::ImportData::Points).to receive(:new) do |_, points_data, batch_size:| + expect(points_data).to be_nil + expect(batch_size).to eq(described_class::STREAM_BATCH_SIZE) + points_importer + end + end + + after do + FileUtils.remove_entry(tmp_dir) + end + + it 'streams sections and updates import stats' do + service.send(:process_archive_data) + + expect(places_calls.flatten.map { |place| place['name'] }).to contain_exactly('Cafe', 'Library') + expect(visits_batches.flatten.map { |visit| visit['name'] }).to contain_exactly('Morning Coffee', 'Study Time') + expect(points_ingested.map { |point| point['timestamp'] }).to eq([1, 2]) + + stats = service.instance_variable_get(:@import_stats) + expect(stats[:places_created]).to eq(2) + expect(stats[:visits_created]).to eq(2) + expect(stats[:points_created]).to eq(2) end end end diff --git a/tmp.json b/tmp.json new file mode 100644 index 00000000..daa5053e --- /dev/null +++ b/tmp.json @@ -0,0 +1 @@ +{"a":1} \ No newline at end of file