Import Google Records JSON in batches

This commit is contained in:
Eugene Burmakin 2025-01-21 19:14:36 +01:00
parent d3a84bf652
commit b43810b1fb
5 changed files with 77 additions and 57 deletions

View file

@ -4,10 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob
queue_as :imports queue_as :imports
sidekiq_options retry: false sidekiq_options retry: false
def perform(import_id, json_array) def perform(import_id, locations, current_index)
locations_batch = Oj.load(locations)
import = Import.find(import_id) import = Import.find(import_id)
records = Oj.load(json_array)
GoogleMaps::RecordsParser.new(import).call(records) GoogleMaps::RecordsParser.new(import, current_index).call(locations_batch)
end end
end end

View file

@ -4,6 +4,8 @@ class Import < ApplicationRecord
belongs_to :user belongs_to :user
has_many :points, dependent: :destroy has_many :points, dependent: :destroy
delegate :count, to: :points, prefix: true
include ImportUploader::Attachment(:raw) include ImportUploader::Attachment(:raw)
enum :source, { enum :source, {

View file

@ -1,51 +1,88 @@
# frozen_string_literal: true # frozen_string_literal: true
class GoogleMaps::RecordsParser class GoogleMaps::RecordsParser
BATCH_SIZE = 1000 include Imports::Broadcaster
attr_reader :import
def initialize(import) BATCH_SIZE = 1000
attr_reader :import, :current_index
def initialize(import, current_index = 0)
@import = import @import = import
@batch = [] @batch = []
@current_index = current_index
end end
def call(records) def call(locations)
Array(records).each do |record| Array(locations).each do |location|
@batch << parse_json(record) @batch << prepare_location_data(location)
next unless @batch.size >= BATCH_SIZE
if @batch.size >= BATCH_SIZE bulk_insert_points(@batch)
bulk_insert_points broadcast_import_progress(import, current_index)
@batch = [] @batch = []
end
end end
bulk_insert_points if @batch.any? return unless @batch.any?
bulk_insert_points(@batch)
broadcast_import_progress(import, current_index)
end end
private private
def parse_json(json) # rubocop:disable Metrics/MethodLength
def prepare_location_data(location)
{ {
latitude: json['latitudeE7'].to_f / 10**7, latitude: location['latitudeE7'].to_f / 10**7,
longitude: json['longitudeE7'].to_f / 10**7, longitude: location['longitudeE7'].to_f / 10**7,
timestamp: Timestamps.parse_timestamp(json['timestamp'] || json['timestampMs']), timestamp: Timestamps.parse_timestamp(location['timestamp'] || location['timestampMs']),
altitude: json['altitude'], altitude: location['altitude'],
velocity: json['velocity'], velocity: location['velocity'],
raw_data: json, raw_data: location,
topic: 'Google Maps Timeline Export', topic: 'Google Maps Timeline Export',
tracker_id: 'google-maps-timeline-export', tracker_id: 'google-maps-timeline-export',
import_id: import.id, import_id: @import.id,
user_id: import.user_id, user_id: @import.user_id,
created_at: Time.current, created_at: Time.current,
updated_at: Time.current updated_at: Time.current
} }
end end
def bulk_insert_points # rubocop:enable Metrics/MethodLength
def bulk_insert_points(batch)
# Deduplicate records within the batch before upserting
# Use all fields in the unique constraint for deduplication
unique_batch = deduplicate_batch(batch)
# Sort the batch to ensure consistent ordering and prevent deadlocks
# sorted_batch = sort_batch(unique_batch)
Point.upsert_all( Point.upsert_all(
@batch, unique_batch,
unique_by: %i[latitude longitude timestamp user_id], unique_by: %i[latitude longitude timestamp user_id],
returning: false returning: false,
on_duplicate: :skip
)
rescue StandardError => e
Rails.logger.error("Batch insert failed for import #{@import.id}: #{e.message}")
# Create notification for the user
Notification.create!(
user: @import.user,
title: 'Google Maps Import Error',
content: "Failed to process location batch: #{e.message}",
kind: :error
) )
end end
def deduplicate_batch(batch)
batch.uniq do |record|
[
record[:latitude].round(7),
record[:longitude].round(7),
record[:timestamp],
record[:user_id]
]
end
end
end end

View file

@ -1,7 +1,6 @@
# frozen_string_literal: true # frozen_string_literal: true
# This class is named based on Google Takeout's Records.json file, # This class is named based on Google Takeout's Records.json file
# the main source of user's location history data.
class Tasks::Imports::GoogleRecords class Tasks::Imports::GoogleRecords
BATCH_SIZE = 1000 # Adjust based on your needs BATCH_SIZE = 1000 # Adjust based on your needs
@ -35,39 +34,20 @@ class Tasks::Imports::GoogleRecords
Oj.load_file(@file_path, mode: :compat) do |record| Oj.load_file(@file_path, mode: :compat) do |record|
next unless record.is_a?(Hash) && record['locations'] next unless record.is_a?(Hash) && record['locations']
record['locations'].each do |location| index = 0
batch << prepare_location_data(location, import_id)
if batch.size >= BATCH_SIZE record['locations'].each do |location|
bulk_insert_points(batch) batch << location
batch = []
end next unless batch.size >= BATCH_SIZE
index += BATCH_SIZE
Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch), index)
batch = []
end end
end end
# Process any remaining records Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch)) if batch.any?
bulk_insert_points(batch) if batch.any?
end
def prepare_location_data(location, import_id)
{
import_id: import_id,
latitude: location['latitudeE7']&.to_f&.div(1e7),
longitude: location['longitudeE7']&.to_f&.div(1e7),
timestamp: Time.zone.at(location['timestampMs'].to_i / 1000),
accuracy: location['accuracy'],
source_data: location.to_json,
created_at: Time.current,
updated_at: Time.current
}
end
def bulk_insert_points(batch)
Point.upsert_all(
batch,
unique_by: %i[import_id timestamp],
returning: false
)
end end
def log_start def log_start

View file

@ -2,6 +2,7 @@
Sidekiq.configure_server do |config| Sidekiq.configure_server do |config|
config.redis = { url: ENV['REDIS_URL'] } config.redis = { url: ENV['REDIS_URL'] }
config.logger = Sidekiq::Logger.new($stdout)
if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true' if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true'
require 'prometheus_exporter/instrumentation' require 'prometheus_exporter/instrumentation'