Stream google records import

This commit is contained in:
Eugene Burmakin 2025-01-21 10:07:54 +01:00
parent 1d820462f6
commit 049812823f
3 changed files with 72 additions and 32 deletions

View file

@ -4,11 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob
queue_as :imports
sidekiq_options retry: false
def perform(import_id, json_string)
def perform(import_id, json_array)
import = Import.find(import_id)
records = Oj.load(json_array)
json = Oj.load(json_string)
GoogleMaps::RecordsParser.new(import).call(json)
GoogleMaps::RecordsParser.new(import).call(records)
end
end

View file

@ -1,32 +1,25 @@
# frozen_string_literal: true
class GoogleMaps::RecordsParser
BATCH_SIZE = 1000
attr_reader :import
def initialize(import)
@import = import
@batch = []
end
def call(json)
data = parse_json(json)
def call(records)
Array(records).each do |record|
@batch << parse_json(record)
return if Point.exists?(
latitude: data[:latitude],
longitude: data[:longitude],
timestamp: data[:timestamp],
user_id: import.user_id
)
if @batch.size >= BATCH_SIZE
bulk_insert_points
@batch = []
end
end
Point.create(
latitude: data[:latitude],
longitude: data[:longitude],
timestamp: data[:timestamp],
raw_data: data[:raw_data],
topic: 'Google Maps Timeline Export',
tracker_id: 'google-maps-timeline-export',
import_id: import.id,
user_id: import.user_id
)
bulk_insert_points if @batch.any?
end
private
@ -38,7 +31,21 @@ class GoogleMaps::RecordsParser
timestamp: Timestamps.parse_timestamp(json['timestamp'] || json['timestampMs']),
altitude: json['altitude'],
velocity: json['velocity'],
raw_data: json
raw_data: json,
topic: 'Google Maps Timeline Export',
tracker_id: 'google-maps-timeline-export',
import_id: import.id,
user_id: import.user_id,
created_at: Time.current,
updated_at: Time.current
}
end
def bulk_insert_points
Point.upsert_all(
@batch,
unique_by: %i[latitude longitude timestamp user_id],
returning: false
)
end
end

View file

@ -4,6 +4,8 @@
# the main source of user's location history data.
class Tasks::Imports::GoogleRecords
BATCH_SIZE = 1000 # Adjust based on your needs
def initialize(file_path, user_email)
@file_path = file_path
@user = User.find_by(email: user_email)
@ -14,10 +16,11 @@ class Tasks::Imports::GoogleRecords
import_id = create_import
log_start
file_content = read_file
json_data = Oj.load(file_content)
schedule_import_jobs(json_data, import_id)
process_file_in_batches(import_id)
log_success
rescue Oj::ParseError => e
Rails.logger.error("JSON parsing error: #{e.message}")
raise
end
private
@ -26,14 +29,45 @@ class Tasks::Imports::GoogleRecords
@user.imports.create(name: @file_path, source: :google_records).id
end
def read_file
File.read(@file_path)
def process_file_in_batches(import_id)
batch = []
Oj.load_file(@file_path, mode: :compat) do |record|
next unless record.is_a?(Hash) && record['locations']
record['locations'].each do |location|
batch << prepare_location_data(location, import_id)
if batch.size >= BATCH_SIZE
bulk_insert_locations(batch)
batch = []
end
end
end
def schedule_import_jobs(json_data, import_id)
json_data['locations'].each do |json|
Import::GoogleTakeoutJob.perform_later(import_id, json.to_json)
# Process any remaining records
bulk_insert_locations(batch) if batch.any?
end
def prepare_location_data(location, import_id)
{
import_id: import_id,
latitude: location['latitudeE7']&.to_f&. / 1e7,
longitude: location['longitudeE7']&.to_f&. / 1e7,
timestamp: Time.at(location['timestampMs'].to_i / 1000),
accuracy: location['accuracy'],
source_data: location.to_json,
created_at: Time.current,
updated_at: Time.current
}
end
def bulk_insert_locations(batch)
Location.upsert_all(
batch,
unique_by: %i[import_id timestamp],
returning: false
)
end
def log_start