mirror of
https://github.com/Freika/dawarich.git
synced 2026-01-10 17:21:38 -05:00
Revert import changes
This commit is contained in:
parent
ea0d03f4b0
commit
1506633441
8 changed files with 80 additions and 193 deletions
|
|
@ -41,164 +41,63 @@ class Users::ImportData
|
|||
end
|
||||
|
||||
def import
|
||||
data = stream_and_parse_archive
|
||||
@import_directory = Rails.root.join('tmp', "import_#{user.email.gsub(/[^0-9A-Za-z._-]/, '_')}_#{Time.current.to_i}")
|
||||
FileUtils.mkdir_p(@import_directory)
|
||||
|
||||
import_in_segments(data)
|
||||
ActiveRecord::Base.transaction do
|
||||
extract_archive
|
||||
data = load_json_data
|
||||
|
||||
create_success_notification
|
||||
import_in_correct_order(data)
|
||||
|
||||
@import_stats
|
||||
create_success_notification
|
||||
|
||||
@import_stats
|
||||
end
|
||||
rescue StandardError => e
|
||||
ExceptionReporter.call(e, 'Data import failed')
|
||||
create_failure_notification(e)
|
||||
raise e
|
||||
ensure
|
||||
# Clean up any temporary files created during streaming
|
||||
cleanup_temporary_files
|
||||
cleanup_temporary_files(@import_directory) if @import_directory&.exist?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
attr_reader :user, :archive_path, :import_stats
|
||||
|
||||
def stream_and_parse_archive
|
||||
Rails.logger.info "Streaming archive: #{archive_path}"
|
||||
|
||||
@temp_files = {}
|
||||
@memory_tracker = Users::ImportData::MemoryTracker.new
|
||||
data_json = nil
|
||||
|
||||
@memory_tracker.log('before_zip_processing')
|
||||
def extract_archive
|
||||
Rails.logger.info "Extracting archive: #{archive_path}"
|
||||
|
||||
Zip::File.open(archive_path) do |zip_file|
|
||||
zip_file.each do |entry|
|
||||
if entry.name == 'data.json'
|
||||
Rails.logger.info "Processing data.json (#{entry.size} bytes)"
|
||||
extraction_path = @import_directory.join(entry.name)
|
||||
|
||||
# Use streaming JSON parser for all files to reduce memory usage
|
||||
streamer = Users::ImportData::JsonStreamer.new(entry)
|
||||
data_json = streamer.stream_parse
|
||||
FileUtils.mkdir_p(File.dirname(extraction_path))
|
||||
|
||||
@memory_tracker.log('after_json_parsing')
|
||||
elsif entry.name.start_with?('files/')
|
||||
# Only extract files that are needed for file attachments
|
||||
temp_path = stream_file_to_temp(entry)
|
||||
@temp_files[entry.name] = temp_path
|
||||
end
|
||||
# Skip extracting other files to save disk space
|
||||
entry.extract(extraction_path)
|
||||
end
|
||||
end
|
||||
|
||||
raise StandardError, 'Data file not found in archive: data.json' unless data_json
|
||||
|
||||
@memory_tracker.log('archive_processing_completed')
|
||||
data_json
|
||||
end
|
||||
|
||||
def stream_file_to_temp(zip_entry)
|
||||
require 'tmpdir'
|
||||
def load_json_data
|
||||
json_path = @import_directory.join('data.json')
|
||||
|
||||
# Create a temporary file for this attachment
|
||||
temp_file = Tempfile.new([File.basename(zip_entry.name, '.*'), File.extname(zip_entry.name)])
|
||||
temp_file.binmode
|
||||
|
||||
zip_entry.get_input_stream do |input_stream|
|
||||
IO.copy_stream(input_stream, temp_file)
|
||||
unless File.exist?(json_path)
|
||||
raise StandardError, "Data file not found in archive: data.json"
|
||||
end
|
||||
|
||||
temp_file.close
|
||||
temp_file.path
|
||||
end
|
||||
|
||||
def import_in_segments(data)
|
||||
Rails.logger.info "Starting segmented data import for user: #{user.email}"
|
||||
|
||||
@memory_tracker&.log('before_core_segment')
|
||||
# Segment 1: User settings and core data (small, fast transaction)
|
||||
import_core_data_segment(data)
|
||||
|
||||
@memory_tracker&.log('before_independent_segment')
|
||||
# Segment 2: Independent entities that can be imported in parallel
|
||||
import_independent_entities_segment(data)
|
||||
|
||||
@memory_tracker&.log('before_dependent_segment')
|
||||
# Segment 3: Dependent entities that require references
|
||||
import_dependent_entities_segment(data)
|
||||
|
||||
# Final validation check
|
||||
validate_import_completeness(data['counts']) if data['counts']
|
||||
|
||||
@memory_tracker&.log('import_completed')
|
||||
Rails.logger.info "Segmented data import completed. Stats: #{@import_stats}"
|
||||
end
|
||||
|
||||
def import_core_data_segment(data)
|
||||
ActiveRecord::Base.transaction do
|
||||
Rails.logger.info 'Importing core data segment'
|
||||
|
||||
import_settings(data['settings']) if data['settings']
|
||||
import_areas(data['areas']) if data['areas']
|
||||
import_places(data['places']) if data['places']
|
||||
|
||||
Rails.logger.info 'Core data segment completed'
|
||||
end
|
||||
end
|
||||
|
||||
def import_independent_entities_segment(data)
|
||||
# These entities don't depend on each other and can be imported in parallel
|
||||
entity_types = %w[imports exports trips stats notifications].select { |type| data[type] }
|
||||
|
||||
if entity_types.empty?
|
||||
Rails.logger.info 'No independent entities to import'
|
||||
return
|
||||
end
|
||||
|
||||
Rails.logger.info "Processing #{entity_types.size} independent entity types in parallel"
|
||||
|
||||
# Use parallel processing for independent entities
|
||||
Parallel.each(entity_types, in_threads: [entity_types.size, 3].min) do |entity_type|
|
||||
ActiveRecord::Base.connection_pool.with_connection do
|
||||
ActiveRecord::Base.transaction do
|
||||
case entity_type
|
||||
when 'imports'
|
||||
import_imports(data['imports'])
|
||||
when 'exports'
|
||||
import_exports(data['exports'])
|
||||
when 'trips'
|
||||
import_trips(data['trips'])
|
||||
when 'stats'
|
||||
import_stats(data['stats'])
|
||||
when 'notifications'
|
||||
import_notifications(data['notifications'])
|
||||
end
|
||||
|
||||
Rails.logger.info "#{entity_type.capitalize} segment completed in parallel"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Rails.logger.info 'All independent entities processing completed'
|
||||
end
|
||||
|
||||
def import_dependent_entities_segment(data)
|
||||
ActiveRecord::Base.transaction do
|
||||
Rails.logger.info 'Importing dependent entities segment'
|
||||
|
||||
# Import visits after places to ensure proper place resolution
|
||||
visits_imported = import_visits(data['visits']) if data['visits']
|
||||
Rails.logger.info "Visits import phase completed: #{visits_imported} visits imported"
|
||||
|
||||
# Points are imported in their own optimized batching system
|
||||
import_points(data['points']) if data['points']
|
||||
|
||||
Rails.logger.info 'Dependent entities segment completed'
|
||||
end
|
||||
JSON.parse(File.read(json_path))
|
||||
rescue JSON::ParserError => e
|
||||
raise StandardError, "Invalid JSON format in data file: #{e.message}"
|
||||
end
|
||||
|
||||
def import_in_correct_order(data)
|
||||
Rails.logger.info "Starting data import for user: #{user.email}"
|
||||
|
||||
Rails.logger.info "Expected entity counts from export: #{data['counts']}" if data['counts']
|
||||
if data['counts']
|
||||
Rails.logger.info "Expected entity counts from export: #{data['counts']}"
|
||||
end
|
||||
|
||||
Rails.logger.debug "Available data keys: #{data.keys.inspect}"
|
||||
|
||||
|
|
@ -222,7 +121,9 @@ class Users::ImportData
|
|||
import_points(data['points']) if data['points']
|
||||
|
||||
# Final validation check
|
||||
validate_import_completeness(data['counts']) if data['counts']
|
||||
if data['counts']
|
||||
validate_import_completeness(data['counts'])
|
||||
end
|
||||
|
||||
Rails.logger.info "Data import completed. Stats: #{@import_stats}"
|
||||
end
|
||||
|
|
@ -248,14 +149,14 @@ class Users::ImportData
|
|||
|
||||
def import_imports(imports_data)
|
||||
Rails.logger.debug "Importing #{imports_data&.size || 0} imports"
|
||||
imports_created, files_restored = Users::ImportData::Imports.new(user, imports_data, @temp_files).call
|
||||
imports_created, files_restored = Users::ImportData::Imports.new(user, imports_data, @import_directory.join('files')).call
|
||||
@import_stats[:imports_created] = imports_created
|
||||
@import_stats[:files_restored] += files_restored
|
||||
end
|
||||
|
||||
def import_exports(exports_data)
|
||||
Rails.logger.debug "Importing #{exports_data&.size || 0} exports"
|
||||
exports_created, files_restored = Users::ImportData::Exports.new(user, exports_data, @temp_files).call
|
||||
exports_created, files_restored = Users::ImportData::Exports.new(user, exports_data, @import_directory.join('files')).call
|
||||
@import_stats[:exports_created] = exports_created
|
||||
@import_stats[:files_restored] += files_restored
|
||||
end
|
||||
|
|
@ -298,18 +199,11 @@ class Users::ImportData
|
|||
end
|
||||
end
|
||||
|
||||
def cleanup_temporary_files
|
||||
return unless @temp_files
|
||||
def cleanup_temporary_files(import_directory)
|
||||
return unless File.directory?(import_directory)
|
||||
|
||||
Rails.logger.info "Cleaning up #{@temp_files.size} temporary files"
|
||||
|
||||
@temp_files.each_value do |temp_path|
|
||||
File.delete(temp_path) if File.exist?(temp_path)
|
||||
rescue StandardError => e
|
||||
Rails.logger.warn "Failed to delete temporary file #{temp_path}: #{e.message}"
|
||||
end
|
||||
|
||||
@temp_files.clear
|
||||
Rails.logger.info "Cleaning up temporary import directory: #{import_directory}"
|
||||
FileUtils.rm_rf(import_directory)
|
||||
rescue StandardError => e
|
||||
ExceptionReporter.call(e, 'Failed to cleanup temporary files')
|
||||
end
|
||||
|
|
@ -344,24 +238,24 @@ class Users::ImportData
|
|||
end
|
||||
|
||||
def validate_import_completeness(expected_counts)
|
||||
Rails.logger.info 'Validating import completeness...'
|
||||
Rails.logger.info "Validating import completeness..."
|
||||
|
||||
discrepancies = []
|
||||
|
||||
expected_counts.each do |entity, expected_count|
|
||||
actual_count = @import_stats[:"#{entity}_created"] || 0
|
||||
|
||||
next unless actual_count < expected_count
|
||||
|
||||
discrepancy = "#{entity}: expected #{expected_count}, got #{actual_count} (#{expected_count - actual_count} missing)"
|
||||
discrepancies << discrepancy
|
||||
Rails.logger.warn "Import discrepancy - #{discrepancy}"
|
||||
if actual_count < expected_count
|
||||
discrepancy = "#{entity}: expected #{expected_count}, got #{actual_count} (#{expected_count - actual_count} missing)"
|
||||
discrepancies << discrepancy
|
||||
Rails.logger.warn "Import discrepancy - #{discrepancy}"
|
||||
end
|
||||
end
|
||||
|
||||
if discrepancies.any?
|
||||
Rails.logger.warn "Import completed with discrepancies: #{discrepancies.join(', ')}"
|
||||
else
|
||||
Rails.logger.info 'Import validation successful - all entities imported correctly'
|
||||
Rails.logger.info "Import validation successful - all entities imported correctly"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Users::ImportData::Areas
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
def initialize(user, areas_data)
|
||||
@user = user
|
||||
|
|
@ -35,10 +36,6 @@ class Users::ImportData::Areas
|
|||
|
||||
attr_reader :user, :areas_data
|
||||
|
||||
def batch_size
|
||||
@batch_size ||= DawarichSettings.import_batch_size
|
||||
end
|
||||
|
||||
def filter_and_prepare_areas
|
||||
valid_areas = []
|
||||
skipped_count = 0
|
||||
|
|
@ -103,7 +100,7 @@ class Users::ImportData::Areas
|
|||
def bulk_import_areas(areas)
|
||||
total_created = 0
|
||||
|
||||
areas.each_slice(batch_size) do |batch|
|
||||
areas.each_slice(BATCH_SIZE) do |batch|
|
||||
begin
|
||||
result = Area.upsert_all(
|
||||
batch,
|
||||
|
|
|
|||
|
|
@ -62,18 +62,16 @@ class Users::ImportData::Exports
|
|||
end
|
||||
|
||||
def restore_export_file(export_record, export_data)
|
||||
# files_directory is actually a hash mapping archive paths to temp file paths
|
||||
archive_file_path = "files/#{export_data['file_name']}"
|
||||
temp_file_path = files_directory[archive_file_path]
|
||||
file_path = files_directory.join(export_data['file_name'])
|
||||
|
||||
unless temp_file_path && File.exist?(temp_file_path)
|
||||
Rails.logger.warn "Export file not found: #{export_data['file_name']} (archive path: #{archive_file_path})"
|
||||
unless File.exist?(file_path)
|
||||
Rails.logger.warn "Export file not found: #{export_data['file_name']}"
|
||||
return false
|
||||
end
|
||||
|
||||
begin
|
||||
export_record.file.attach(
|
||||
io: File.open(temp_file_path),
|
||||
io: File.open(file_path),
|
||||
filename: export_data['original_filename'] || export_data['file_name'],
|
||||
content_type: export_data['content_type'] || 'application/octet-stream'
|
||||
)
|
||||
|
|
|
|||
|
|
@ -74,18 +74,16 @@ class Users::ImportData::Imports
|
|||
end
|
||||
|
||||
def restore_import_file(import_record, import_data)
|
||||
# files_directory is actually a hash mapping archive paths to temp file paths
|
||||
archive_file_path = "files/#{import_data['file_name']}"
|
||||
temp_file_path = files_directory[archive_file_path]
|
||||
file_path = files_directory.join(import_data['file_name'])
|
||||
|
||||
unless temp_file_path && File.exist?(temp_file_path)
|
||||
Rails.logger.warn "Import file not found: #{import_data['file_name']} (archive path: #{archive_file_path})"
|
||||
unless File.exist?(file_path)
|
||||
Rails.logger.warn "Import file not found: #{import_data['file_name']}"
|
||||
return false
|
||||
end
|
||||
|
||||
begin
|
||||
import_record.file.attach(
|
||||
io: File.open(temp_file_path),
|
||||
io: File.open(file_path),
|
||||
filename: import_data['original_filename'] || import_data['file_name'],
|
||||
content_type: import_data['content_type'] || 'application/octet-stream'
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Users::ImportData::Notifications
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
def initialize(user, notifications_data)
|
||||
@user = user
|
||||
|
|
@ -35,10 +36,6 @@ class Users::ImportData::Notifications
|
|||
|
||||
attr_reader :user, :notifications_data
|
||||
|
||||
def batch_size
|
||||
@batch_size ||= DawarichSettings.import_batch_size
|
||||
end
|
||||
|
||||
def filter_and_prepare_notifications
|
||||
valid_notifications = []
|
||||
skipped_count = 0
|
||||
|
|
@ -126,7 +123,7 @@ class Users::ImportData::Notifications
|
|||
def bulk_import_notifications(notifications)
|
||||
total_created = 0
|
||||
|
||||
notifications.each_slice(batch_size) do |batch|
|
||||
notifications.each_slice(BATCH_SIZE) do |batch|
|
||||
begin
|
||||
result = Notification.upsert_all(
|
||||
batch,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Users::ImportData::Points
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
def initialize(user, points_data)
|
||||
@user = user
|
||||
|
|
@ -37,10 +38,6 @@ class Users::ImportData::Points
|
|||
|
||||
attr_reader :user, :points_data, :imports_lookup, :countries_lookup, :visits_lookup
|
||||
|
||||
def batch_size
|
||||
@batch_size ||= DawarichSettings.import_batch_size
|
||||
end
|
||||
|
||||
def preload_reference_data
|
||||
@imports_lookup = {}
|
||||
user.imports.each do |import|
|
||||
|
|
@ -74,12 +71,14 @@ class Users::ImportData::Points
|
|||
|
||||
unless valid_point_data?(point_data)
|
||||
skipped_count += 1
|
||||
Rails.logger.debug "Skipped point #{index}: invalid data - #{point_data.slice('timestamp', 'longitude', 'latitude', 'lonlat')}"
|
||||
next
|
||||
end
|
||||
|
||||
prepared_attributes = prepare_point_attributes(point_data)
|
||||
unless prepared_attributes
|
||||
skipped_count += 1
|
||||
Rails.logger.debug "Skipped point #{index}: failed to prepare attributes"
|
||||
next
|
||||
end
|
||||
|
||||
|
|
@ -117,7 +116,10 @@ class Users::ImportData::Points
|
|||
resolve_country_reference(attributes, point_data['country_info'])
|
||||
resolve_visit_reference(attributes, point_data['visit_reference'])
|
||||
|
||||
attributes.symbolize_keys
|
||||
result = attributes.symbolize_keys
|
||||
|
||||
Rails.logger.debug "Prepared point attributes: #{result.slice(:lonlat, :timestamp, :import_id, :country_id, :visit_id)}"
|
||||
result
|
||||
rescue StandardError => e
|
||||
ExceptionReporter.call(e, 'Failed to prepare point attributes')
|
||||
|
||||
|
|
@ -192,20 +194,25 @@ class Users::ImportData::Points
|
|||
end
|
||||
|
||||
def normalize_point_keys(points)
|
||||
# Return points as-is since upsert_all can handle inconsistent keys
|
||||
# This eliminates the expensive hash reconstruction overhead
|
||||
points
|
||||
all_keys = points.flat_map(&:keys).uniq
|
||||
|
||||
# Normalize each point to have all keys (with nil for missing ones)
|
||||
points.map do |point|
|
||||
normalized = {}
|
||||
all_keys.each do |key|
|
||||
normalized[key] = point[key]
|
||||
end
|
||||
normalized
|
||||
end
|
||||
end
|
||||
|
||||
def bulk_import_points(points)
|
||||
total_created = 0
|
||||
|
||||
points.each_slice(batch_size) do |batch|
|
||||
points.each_slice(BATCH_SIZE) do |batch|
|
||||
begin
|
||||
# Only log every 10th batch to reduce noise
|
||||
if (total_created / batch_size) % 10 == 0
|
||||
Rails.logger.info "Processed #{total_created} points so far, current batch: #{batch.size}"
|
||||
end
|
||||
Rails.logger.debug "Processing batch of #{batch.size} points"
|
||||
Rails.logger.debug "First point in batch: #{batch.first.inspect}"
|
||||
|
||||
normalized_batch = normalize_point_keys(batch)
|
||||
|
||||
|
|
@ -219,6 +226,8 @@ class Users::ImportData::Points
|
|||
batch_created = result.count
|
||||
total_created += batch_created
|
||||
|
||||
Rails.logger.debug "Processed batch of #{batch.size} points, created #{batch_created}, total created: #{total_created}"
|
||||
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "Failed to process point batch: #{e.message}"
|
||||
Rails.logger.error "Batch size: #{batch.size}"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Users::ImportData::Stats
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
def initialize(user, stats_data)
|
||||
@user = user
|
||||
|
|
@ -35,10 +36,6 @@ class Users::ImportData::Stats
|
|||
|
||||
attr_reader :user, :stats_data
|
||||
|
||||
def batch_size
|
||||
@batch_size ||= DawarichSettings.import_batch_size
|
||||
end
|
||||
|
||||
def filter_and_prepare_stats
|
||||
valid_stats = []
|
||||
skipped_count = 0
|
||||
|
|
@ -102,7 +99,7 @@ class Users::ImportData::Stats
|
|||
def bulk_import_stats(stats)
|
||||
total_created = 0
|
||||
|
||||
stats.each_slice(batch_size) do |batch|
|
||||
stats.each_slice(BATCH_SIZE) do |batch|
|
||||
begin
|
||||
result = Stat.upsert_all(
|
||||
batch,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class Users::ImportData::Trips
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
def initialize(user, trips_data)
|
||||
@user = user
|
||||
|
|
@ -35,10 +36,6 @@ class Users::ImportData::Trips
|
|||
|
||||
attr_reader :user, :trips_data
|
||||
|
||||
def batch_size
|
||||
@batch_size ||= DawarichSettings.import_batch_size
|
||||
end
|
||||
|
||||
def filter_and_prepare_trips
|
||||
valid_trips = []
|
||||
skipped_count = 0
|
||||
|
|
@ -114,7 +111,7 @@ class Users::ImportData::Trips
|
|||
def bulk_import_trips(trips)
|
||||
total_created = 0
|
||||
|
||||
trips.each_slice(batch_size) do |batch|
|
||||
trips.each_slice(BATCH_SIZE) do |batch|
|
||||
begin
|
||||
result = Trip.upsert_all(
|
||||
batch,
|
||||
|
|
|
|||
Loading…
Reference in a new issue