diff --git a/app/jobs/import/google_takeout_job.rb b/app/jobs/import/google_takeout_job.rb index d962a304..4cb74ae2 100644 --- a/app/jobs/import/google_takeout_job.rb +++ b/app/jobs/import/google_takeout_job.rb @@ -4,11 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob queue_as :imports sidekiq_options retry: false - def perform(import_id, json_string) + def perform(import_id, json_array) import = Import.find(import_id) + records = Oj.load(json_array) - json = Oj.load(json_string) - - GoogleMaps::RecordsParser.new(import).call(json) + GoogleMaps::RecordsParser.new(import).call(records) end end diff --git a/app/services/google_maps/records_parser.rb b/app/services/google_maps/records_parser.rb index 04ee4621..e2fe2e05 100644 --- a/app/services/google_maps/records_parser.rb +++ b/app/services/google_maps/records_parser.rb @@ -1,32 +1,25 @@ # frozen_string_literal: true class GoogleMaps::RecordsParser + BATCH_SIZE = 1000 attr_reader :import def initialize(import) @import = import + @batch = [] end - def call(json) - data = parse_json(json) + def call(records) + Array(records).each do |record| + @batch << parse_json(record) - return if Point.exists?( - latitude: data[:latitude], - longitude: data[:longitude], - timestamp: data[:timestamp], - user_id: import.user_id - ) + if @batch.size >= BATCH_SIZE + bulk_insert_points + @batch = [] + end + end - Point.create( - latitude: data[:latitude], - longitude: data[:longitude], - timestamp: data[:timestamp], - raw_data: data[:raw_data], - topic: 'Google Maps Timeline Export', - tracker_id: 'google-maps-timeline-export', - import_id: import.id, - user_id: import.user_id - ) + bulk_insert_points if @batch.any? end private @@ -38,7 +31,21 @@ class GoogleMaps::RecordsParser timestamp: Timestamps.parse_timestamp(json['timestamp'] || json['timestampMs']), altitude: json['altitude'], velocity: json['velocity'], - raw_data: json + raw_data: json, + topic: 'Google Maps Timeline Export', + tracker_id: 'google-maps-timeline-export', + import_id: import.id, + user_id: import.user_id, + created_at: Time.current, + updated_at: Time.current } end + + def bulk_insert_points + Point.upsert_all( + @batch, + unique_by: %i[latitude longitude timestamp user_id], + returning: false + ) + end end diff --git a/app/services/tasks/imports/google_records.rb b/app/services/tasks/imports/google_records.rb index 8f8839e3..9b91f76f 100644 --- a/app/services/tasks/imports/google_records.rb +++ b/app/services/tasks/imports/google_records.rb @@ -4,6 +4,8 @@ # the main source of user's location history data. class Tasks::Imports::GoogleRecords + BATCH_SIZE = 1000 # Adjust based on your needs + def initialize(file_path, user_email) @file_path = file_path @user = User.find_by(email: user_email) @@ -14,10 +16,11 @@ class Tasks::Imports::GoogleRecords import_id = create_import log_start - file_content = read_file - json_data = Oj.load(file_content) - schedule_import_jobs(json_data, import_id) + process_file_in_batches(import_id) log_success + rescue Oj::ParseError => e + Rails.logger.error("JSON parsing error: #{e.message}") + raise end private @@ -26,14 +29,45 @@ class Tasks::Imports::GoogleRecords @user.imports.create(name: @file_path, source: :google_records).id end - def read_file - File.read(@file_path) + def process_file_in_batches(import_id) + batch = [] + + Oj.load_file(@file_path, mode: :compat) do |record| + next unless record.is_a?(Hash) && record['locations'] + + record['locations'].each do |location| + batch << prepare_location_data(location, import_id) + + if batch.size >= BATCH_SIZE + bulk_insert_locations(batch) + batch = [] + end + end + end + + # Process any remaining records + bulk_insert_locations(batch) if batch.any? end - def schedule_import_jobs(json_data, import_id) - json_data['locations'].each do |json| - Import::GoogleTakeoutJob.perform_later(import_id, json.to_json) - end + def prepare_location_data(location, import_id) + { + import_id: import_id, + latitude: location['latitudeE7']&.to_f&. / 1e7, + longitude: location['longitudeE7']&.to_f&. / 1e7, + timestamp: Time.at(location['timestampMs'].to_i / 1000), + accuracy: location['accuracy'], + source_data: location.to_json, + created_at: Time.current, + updated_at: Time.current + } + end + + def bulk_insert_locations(batch) + Location.upsert_all( + batch, + unique_by: %i[import_id timestamp], + returning: false + ) end def log_start