diff --git a/app/jobs/import/google_takeout_job.rb b/app/jobs/import/google_takeout_job.rb index 4cb74ae2..3bfe965c 100644 --- a/app/jobs/import/google_takeout_job.rb +++ b/app/jobs/import/google_takeout_job.rb @@ -4,10 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob queue_as :imports sidekiq_options retry: false - def perform(import_id, json_array) + def perform(import_id, locations, current_index) + locations_batch = Oj.load(locations) import = Import.find(import_id) - records = Oj.load(json_array) - GoogleMaps::RecordsParser.new(import).call(records) + GoogleMaps::RecordsParser.new(import, current_index).call(locations_batch) end end diff --git a/app/models/import.rb b/app/models/import.rb index f396c555..045e8b5f 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -4,6 +4,8 @@ class Import < ApplicationRecord belongs_to :user has_many :points, dependent: :destroy + delegate :count, to: :points, prefix: true + include ImportUploader::Attachment(:raw) enum :source, { diff --git a/app/services/google_maps/records_parser.rb b/app/services/google_maps/records_parser.rb index e2fe2e05..0762e0d6 100644 --- a/app/services/google_maps/records_parser.rb +++ b/app/services/google_maps/records_parser.rb @@ -1,51 +1,88 @@ # frozen_string_literal: true class GoogleMaps::RecordsParser - BATCH_SIZE = 1000 - attr_reader :import + include Imports::Broadcaster - def initialize(import) + BATCH_SIZE = 1000 + attr_reader :import, :current_index + + def initialize(import, current_index = 0) @import = import @batch = [] + @current_index = current_index end - def call(records) - Array(records).each do |record| - @batch << parse_json(record) + def call(locations) + Array(locations).each do |location| + @batch << prepare_location_data(location) + next unless @batch.size >= BATCH_SIZE - if @batch.size >= BATCH_SIZE - bulk_insert_points - @batch = [] - end + bulk_insert_points(@batch) + broadcast_import_progress(import, current_index) + @batch = [] end - bulk_insert_points if @batch.any? + return unless @batch.any? + + bulk_insert_points(@batch) + broadcast_import_progress(import, current_index) end private - def parse_json(json) + # rubocop:disable Metrics/MethodLength + def prepare_location_data(location) { - latitude: json['latitudeE7'].to_f / 10**7, - longitude: json['longitudeE7'].to_f / 10**7, - timestamp: Timestamps.parse_timestamp(json['timestamp'] || json['timestampMs']), - altitude: json['altitude'], - velocity: json['velocity'], - raw_data: json, + latitude: location['latitudeE7'].to_f / 10**7, + longitude: location['longitudeE7'].to_f / 10**7, + timestamp: Timestamps.parse_timestamp(location['timestamp'] || location['timestampMs']), + altitude: location['altitude'], + velocity: location['velocity'], + raw_data: location, topic: 'Google Maps Timeline Export', tracker_id: 'google-maps-timeline-export', - import_id: import.id, - user_id: import.user_id, + import_id: @import.id, + user_id: @import.user_id, created_at: Time.current, updated_at: Time.current } end - def bulk_insert_points + # rubocop:enable Metrics/MethodLength + def bulk_insert_points(batch) + # Deduplicate records within the batch before upserting + # Use all fields in the unique constraint for deduplication + unique_batch = deduplicate_batch(batch) + + # Sort the batch to ensure consistent ordering and prevent deadlocks + # sorted_batch = sort_batch(unique_batch) + Point.upsert_all( - @batch, + unique_batch, unique_by: %i[latitude longitude timestamp user_id], - returning: false + returning: false, + on_duplicate: :skip + ) + rescue StandardError => e + Rails.logger.error("Batch insert failed for import #{@import.id}: #{e.message}") + + # Create notification for the user + Notification.create!( + user: @import.user, + title: 'Google Maps Import Error', + content: "Failed to process location batch: #{e.message}", + kind: :error ) end + + def deduplicate_batch(batch) + batch.uniq do |record| + [ + record[:latitude].round(7), + record[:longitude].round(7), + record[:timestamp], + record[:user_id] + ] + end + end end diff --git a/app/services/tasks/imports/google_records.rb b/app/services/tasks/imports/google_records.rb index fc74a823..83174128 100644 --- a/app/services/tasks/imports/google_records.rb +++ b/app/services/tasks/imports/google_records.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true -# This class is named based on Google Takeout's Records.json file, -# the main source of user's location history data. +# This class is named based on Google Takeout's Records.json file class Tasks::Imports::GoogleRecords BATCH_SIZE = 1000 # Adjust based on your needs @@ -35,39 +34,20 @@ class Tasks::Imports::GoogleRecords Oj.load_file(@file_path, mode: :compat) do |record| next unless record.is_a?(Hash) && record['locations'] - record['locations'].each do |location| - batch << prepare_location_data(location, import_id) + index = 0 - if batch.size >= BATCH_SIZE - bulk_insert_points(batch) - batch = [] - end + record['locations'].each do |location| + batch << location + + next unless batch.size >= BATCH_SIZE + + index += BATCH_SIZE + Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch), index) + batch = [] end end - # Process any remaining records - bulk_insert_points(batch) if batch.any? - end - - def prepare_location_data(location, import_id) - { - import_id: import_id, - latitude: location['latitudeE7']&.to_f&.div(1e7), - longitude: location['longitudeE7']&.to_f&.div(1e7), - timestamp: Time.zone.at(location['timestampMs'].to_i / 1000), - accuracy: location['accuracy'], - source_data: location.to_json, - created_at: Time.current, - updated_at: Time.current - } - end - - def bulk_insert_points(batch) - Point.upsert_all( - batch, - unique_by: %i[import_id timestamp], - returning: false - ) + Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch)) if batch.any? end def log_start diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index d9dec786..ab3f00c5 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -2,6 +2,7 @@ Sidekiq.configure_server do |config| config.redis = { url: ENV['REDIS_URL'] } + config.logger = Sidekiq::Logger.new($stdout) if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true' require 'prometheus_exporter/instrumentation'