From 15066334419142161f99fb82a7d827614d49edfc Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Fri, 26 Sep 2025 19:45:18 +0200 Subject: [PATCH] Revert import changes --- app/services/users/import_data.rb | 190 ++++-------------- app/services/users/import_data/areas.rb | 7 +- app/services/users/import_data/exports.rb | 10 +- app/services/users/import_data/imports.rb | 10 +- .../users/import_data/notifications.rb | 7 +- app/services/users/import_data/points.rb | 35 ++-- app/services/users/import_data/stats.rb | 7 +- app/services/users/import_data/trips.rb | 7 +- 8 files changed, 80 insertions(+), 193 deletions(-) diff --git a/app/services/users/import_data.rb b/app/services/users/import_data.rb index 95dd555d..664c27cc 100644 --- a/app/services/users/import_data.rb +++ b/app/services/users/import_data.rb @@ -41,164 +41,63 @@ class Users::ImportData end def import - data = stream_and_parse_archive + @import_directory = Rails.root.join('tmp', "import_#{user.email.gsub(/[^0-9A-Za-z._-]/, '_')}_#{Time.current.to_i}") + FileUtils.mkdir_p(@import_directory) - import_in_segments(data) + ActiveRecord::Base.transaction do + extract_archive + data = load_json_data - create_success_notification + import_in_correct_order(data) - @import_stats + create_success_notification + + @import_stats + end rescue StandardError => e ExceptionReporter.call(e, 'Data import failed') create_failure_notification(e) raise e ensure - # Clean up any temporary files created during streaming - cleanup_temporary_files + cleanup_temporary_files(@import_directory) if @import_directory&.exist? end private attr_reader :user, :archive_path, :import_stats - def stream_and_parse_archive - Rails.logger.info "Streaming archive: #{archive_path}" - - @temp_files = {} - @memory_tracker = Users::ImportData::MemoryTracker.new - data_json = nil - - @memory_tracker.log('before_zip_processing') + def extract_archive + Rails.logger.info "Extracting archive: #{archive_path}" Zip::File.open(archive_path) do |zip_file| zip_file.each do |entry| - if entry.name == 'data.json' - Rails.logger.info "Processing data.json (#{entry.size} bytes)" + extraction_path = @import_directory.join(entry.name) - # Use streaming JSON parser for all files to reduce memory usage - streamer = Users::ImportData::JsonStreamer.new(entry) - data_json = streamer.stream_parse + FileUtils.mkdir_p(File.dirname(extraction_path)) - @memory_tracker.log('after_json_parsing') - elsif entry.name.start_with?('files/') - # Only extract files that are needed for file attachments - temp_path = stream_file_to_temp(entry) - @temp_files[entry.name] = temp_path - end - # Skip extracting other files to save disk space + entry.extract(extraction_path) end end - - raise StandardError, 'Data file not found in archive: data.json' unless data_json - - @memory_tracker.log('archive_processing_completed') - data_json end - def stream_file_to_temp(zip_entry) - require 'tmpdir' + def load_json_data + json_path = @import_directory.join('data.json') - # Create a temporary file for this attachment - temp_file = Tempfile.new([File.basename(zip_entry.name, '.*'), File.extname(zip_entry.name)]) - temp_file.binmode - - zip_entry.get_input_stream do |input_stream| - IO.copy_stream(input_stream, temp_file) + unless File.exist?(json_path) + raise StandardError, "Data file not found in archive: data.json" end - temp_file.close - temp_file.path - end - - def import_in_segments(data) - Rails.logger.info "Starting segmented data import for user: #{user.email}" - - @memory_tracker&.log('before_core_segment') - # Segment 1: User settings and core data (small, fast transaction) - import_core_data_segment(data) - - @memory_tracker&.log('before_independent_segment') - # Segment 2: Independent entities that can be imported in parallel - import_independent_entities_segment(data) - - @memory_tracker&.log('before_dependent_segment') - # Segment 3: Dependent entities that require references - import_dependent_entities_segment(data) - - # Final validation check - validate_import_completeness(data['counts']) if data['counts'] - - @memory_tracker&.log('import_completed') - Rails.logger.info "Segmented data import completed. Stats: #{@import_stats}" - end - - def import_core_data_segment(data) - ActiveRecord::Base.transaction do - Rails.logger.info 'Importing core data segment' - - import_settings(data['settings']) if data['settings'] - import_areas(data['areas']) if data['areas'] - import_places(data['places']) if data['places'] - - Rails.logger.info 'Core data segment completed' - end - end - - def import_independent_entities_segment(data) - # These entities don't depend on each other and can be imported in parallel - entity_types = %w[imports exports trips stats notifications].select { |type| data[type] } - - if entity_types.empty? - Rails.logger.info 'No independent entities to import' - return - end - - Rails.logger.info "Processing #{entity_types.size} independent entity types in parallel" - - # Use parallel processing for independent entities - Parallel.each(entity_types, in_threads: [entity_types.size, 3].min) do |entity_type| - ActiveRecord::Base.connection_pool.with_connection do - ActiveRecord::Base.transaction do - case entity_type - when 'imports' - import_imports(data['imports']) - when 'exports' - import_exports(data['exports']) - when 'trips' - import_trips(data['trips']) - when 'stats' - import_stats(data['stats']) - when 'notifications' - import_notifications(data['notifications']) - end - - Rails.logger.info "#{entity_type.capitalize} segment completed in parallel" - end - end - end - - Rails.logger.info 'All independent entities processing completed' - end - - def import_dependent_entities_segment(data) - ActiveRecord::Base.transaction do - Rails.logger.info 'Importing dependent entities segment' - - # Import visits after places to ensure proper place resolution - visits_imported = import_visits(data['visits']) if data['visits'] - Rails.logger.info "Visits import phase completed: #{visits_imported} visits imported" - - # Points are imported in their own optimized batching system - import_points(data['points']) if data['points'] - - Rails.logger.info 'Dependent entities segment completed' - end + JSON.parse(File.read(json_path)) + rescue JSON::ParserError => e + raise StandardError, "Invalid JSON format in data file: #{e.message}" end def import_in_correct_order(data) Rails.logger.info "Starting data import for user: #{user.email}" - Rails.logger.info "Expected entity counts from export: #{data['counts']}" if data['counts'] + if data['counts'] + Rails.logger.info "Expected entity counts from export: #{data['counts']}" + end Rails.logger.debug "Available data keys: #{data.keys.inspect}" @@ -222,7 +121,9 @@ class Users::ImportData import_points(data['points']) if data['points'] # Final validation check - validate_import_completeness(data['counts']) if data['counts'] + if data['counts'] + validate_import_completeness(data['counts']) + end Rails.logger.info "Data import completed. Stats: #{@import_stats}" end @@ -248,14 +149,14 @@ class Users::ImportData def import_imports(imports_data) Rails.logger.debug "Importing #{imports_data&.size || 0} imports" - imports_created, files_restored = Users::ImportData::Imports.new(user, imports_data, @temp_files).call + imports_created, files_restored = Users::ImportData::Imports.new(user, imports_data, @import_directory.join('files')).call @import_stats[:imports_created] = imports_created @import_stats[:files_restored] += files_restored end def import_exports(exports_data) Rails.logger.debug "Importing #{exports_data&.size || 0} exports" - exports_created, files_restored = Users::ImportData::Exports.new(user, exports_data, @temp_files).call + exports_created, files_restored = Users::ImportData::Exports.new(user, exports_data, @import_directory.join('files')).call @import_stats[:exports_created] = exports_created @import_stats[:files_restored] += files_restored end @@ -298,18 +199,11 @@ class Users::ImportData end end - def cleanup_temporary_files - return unless @temp_files + def cleanup_temporary_files(import_directory) + return unless File.directory?(import_directory) - Rails.logger.info "Cleaning up #{@temp_files.size} temporary files" - - @temp_files.each_value do |temp_path| - File.delete(temp_path) if File.exist?(temp_path) - rescue StandardError => e - Rails.logger.warn "Failed to delete temporary file #{temp_path}: #{e.message}" - end - - @temp_files.clear + Rails.logger.info "Cleaning up temporary import directory: #{import_directory}" + FileUtils.rm_rf(import_directory) rescue StandardError => e ExceptionReporter.call(e, 'Failed to cleanup temporary files') end @@ -344,24 +238,24 @@ class Users::ImportData end def validate_import_completeness(expected_counts) - Rails.logger.info 'Validating import completeness...' + Rails.logger.info "Validating import completeness..." discrepancies = [] expected_counts.each do |entity, expected_count| actual_count = @import_stats[:"#{entity}_created"] || 0 - next unless actual_count < expected_count - - discrepancy = "#{entity}: expected #{expected_count}, got #{actual_count} (#{expected_count - actual_count} missing)" - discrepancies << discrepancy - Rails.logger.warn "Import discrepancy - #{discrepancy}" + if actual_count < expected_count + discrepancy = "#{entity}: expected #{expected_count}, got #{actual_count} (#{expected_count - actual_count} missing)" + discrepancies << discrepancy + Rails.logger.warn "Import discrepancy - #{discrepancy}" + end end if discrepancies.any? Rails.logger.warn "Import completed with discrepancies: #{discrepancies.join(', ')}" else - Rails.logger.info 'Import validation successful - all entities imported correctly' + Rails.logger.info "Import validation successful - all entities imported correctly" end end end diff --git a/app/services/users/import_data/areas.rb b/app/services/users/import_data/areas.rb index 197d3d05..d14fda64 100644 --- a/app/services/users/import_data/areas.rb +++ b/app/services/users/import_data/areas.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true class Users::ImportData::Areas + BATCH_SIZE = 1000 def initialize(user, areas_data) @user = user @@ -35,10 +36,6 @@ class Users::ImportData::Areas attr_reader :user, :areas_data - def batch_size - @batch_size ||= DawarichSettings.import_batch_size - end - def filter_and_prepare_areas valid_areas = [] skipped_count = 0 @@ -103,7 +100,7 @@ class Users::ImportData::Areas def bulk_import_areas(areas) total_created = 0 - areas.each_slice(batch_size) do |batch| + areas.each_slice(BATCH_SIZE) do |batch| begin result = Area.upsert_all( batch, diff --git a/app/services/users/import_data/exports.rb b/app/services/users/import_data/exports.rb index 13fd81d8..8f8077ff 100644 --- a/app/services/users/import_data/exports.rb +++ b/app/services/users/import_data/exports.rb @@ -62,18 +62,16 @@ class Users::ImportData::Exports end def restore_export_file(export_record, export_data) - # files_directory is actually a hash mapping archive paths to temp file paths - archive_file_path = "files/#{export_data['file_name']}" - temp_file_path = files_directory[archive_file_path] + file_path = files_directory.join(export_data['file_name']) - unless temp_file_path && File.exist?(temp_file_path) - Rails.logger.warn "Export file not found: #{export_data['file_name']} (archive path: #{archive_file_path})" + unless File.exist?(file_path) + Rails.logger.warn "Export file not found: #{export_data['file_name']}" return false end begin export_record.file.attach( - io: File.open(temp_file_path), + io: File.open(file_path), filename: export_data['original_filename'] || export_data['file_name'], content_type: export_data['content_type'] || 'application/octet-stream' ) diff --git a/app/services/users/import_data/imports.rb b/app/services/users/import_data/imports.rb index 8bbb48a3..c84f7853 100644 --- a/app/services/users/import_data/imports.rb +++ b/app/services/users/import_data/imports.rb @@ -74,18 +74,16 @@ class Users::ImportData::Imports end def restore_import_file(import_record, import_data) - # files_directory is actually a hash mapping archive paths to temp file paths - archive_file_path = "files/#{import_data['file_name']}" - temp_file_path = files_directory[archive_file_path] + file_path = files_directory.join(import_data['file_name']) - unless temp_file_path && File.exist?(temp_file_path) - Rails.logger.warn "Import file not found: #{import_data['file_name']} (archive path: #{archive_file_path})" + unless File.exist?(file_path) + Rails.logger.warn "Import file not found: #{import_data['file_name']}" return false end begin import_record.file.attach( - io: File.open(temp_file_path), + io: File.open(file_path), filename: import_data['original_filename'] || import_data['file_name'], content_type: import_data['content_type'] || 'application/octet-stream' ) diff --git a/app/services/users/import_data/notifications.rb b/app/services/users/import_data/notifications.rb index ec877ec7..e485d0aa 100644 --- a/app/services/users/import_data/notifications.rb +++ b/app/services/users/import_data/notifications.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true class Users::ImportData::Notifications + BATCH_SIZE = 1000 def initialize(user, notifications_data) @user = user @@ -35,10 +36,6 @@ class Users::ImportData::Notifications attr_reader :user, :notifications_data - def batch_size - @batch_size ||= DawarichSettings.import_batch_size - end - def filter_and_prepare_notifications valid_notifications = [] skipped_count = 0 @@ -126,7 +123,7 @@ class Users::ImportData::Notifications def bulk_import_notifications(notifications) total_created = 0 - notifications.each_slice(batch_size) do |batch| + notifications.each_slice(BATCH_SIZE) do |batch| begin result = Notification.upsert_all( batch, diff --git a/app/services/users/import_data/points.rb b/app/services/users/import_data/points.rb index 5334fafa..c0c6139d 100644 --- a/app/services/users/import_data/points.rb +++ b/app/services/users/import_data/points.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true class Users::ImportData::Points + BATCH_SIZE = 1000 def initialize(user, points_data) @user = user @@ -37,10 +38,6 @@ class Users::ImportData::Points attr_reader :user, :points_data, :imports_lookup, :countries_lookup, :visits_lookup - def batch_size - @batch_size ||= DawarichSettings.import_batch_size - end - def preload_reference_data @imports_lookup = {} user.imports.each do |import| @@ -74,12 +71,14 @@ class Users::ImportData::Points unless valid_point_data?(point_data) skipped_count += 1 + Rails.logger.debug "Skipped point #{index}: invalid data - #{point_data.slice('timestamp', 'longitude', 'latitude', 'lonlat')}" next end prepared_attributes = prepare_point_attributes(point_data) unless prepared_attributes skipped_count += 1 + Rails.logger.debug "Skipped point #{index}: failed to prepare attributes" next end @@ -117,7 +116,10 @@ class Users::ImportData::Points resolve_country_reference(attributes, point_data['country_info']) resolve_visit_reference(attributes, point_data['visit_reference']) - attributes.symbolize_keys + result = attributes.symbolize_keys + + Rails.logger.debug "Prepared point attributes: #{result.slice(:lonlat, :timestamp, :import_id, :country_id, :visit_id)}" + result rescue StandardError => e ExceptionReporter.call(e, 'Failed to prepare point attributes') @@ -192,20 +194,25 @@ class Users::ImportData::Points end def normalize_point_keys(points) - # Return points as-is since upsert_all can handle inconsistent keys - # This eliminates the expensive hash reconstruction overhead - points + all_keys = points.flat_map(&:keys).uniq + + # Normalize each point to have all keys (with nil for missing ones) + points.map do |point| + normalized = {} + all_keys.each do |key| + normalized[key] = point[key] + end + normalized + end end def bulk_import_points(points) total_created = 0 - points.each_slice(batch_size) do |batch| + points.each_slice(BATCH_SIZE) do |batch| begin - # Only log every 10th batch to reduce noise - if (total_created / batch_size) % 10 == 0 - Rails.logger.info "Processed #{total_created} points so far, current batch: #{batch.size}" - end + Rails.logger.debug "Processing batch of #{batch.size} points" + Rails.logger.debug "First point in batch: #{batch.first.inspect}" normalized_batch = normalize_point_keys(batch) @@ -219,6 +226,8 @@ class Users::ImportData::Points batch_created = result.count total_created += batch_created + Rails.logger.debug "Processed batch of #{batch.size} points, created #{batch_created}, total created: #{total_created}" + rescue StandardError => e Rails.logger.error "Failed to process point batch: #{e.message}" Rails.logger.error "Batch size: #{batch.size}" diff --git a/app/services/users/import_data/stats.rb b/app/services/users/import_data/stats.rb index e8df2250..c11ead0a 100644 --- a/app/services/users/import_data/stats.rb +++ b/app/services/users/import_data/stats.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true class Users::ImportData::Stats + BATCH_SIZE = 1000 def initialize(user, stats_data) @user = user @@ -35,10 +36,6 @@ class Users::ImportData::Stats attr_reader :user, :stats_data - def batch_size - @batch_size ||= DawarichSettings.import_batch_size - end - def filter_and_prepare_stats valid_stats = [] skipped_count = 0 @@ -102,7 +99,7 @@ class Users::ImportData::Stats def bulk_import_stats(stats) total_created = 0 - stats.each_slice(batch_size) do |batch| + stats.each_slice(BATCH_SIZE) do |batch| begin result = Stat.upsert_all( batch, diff --git a/app/services/users/import_data/trips.rb b/app/services/users/import_data/trips.rb index 5fa313df..72b6a5c4 100644 --- a/app/services/users/import_data/trips.rb +++ b/app/services/users/import_data/trips.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true class Users::ImportData::Trips + BATCH_SIZE = 1000 def initialize(user, trips_data) @user = user @@ -35,10 +36,6 @@ class Users::ImportData::Trips attr_reader :user, :trips_data - def batch_size - @batch_size ||= DawarichSettings.import_batch_size - end - def filter_and_prepare_trips valid_trips = [] skipped_count = 0 @@ -114,7 +111,7 @@ class Users::ImportData::Trips def bulk_import_trips(trips) total_created = 0 - trips.each_slice(batch_size) do |batch| + trips.each_slice(BATCH_SIZE) do |batch| begin result = Trip.upsert_all( batch,