mirror of
https://github.com/Freika/dawarich.git
synced 2026-01-11 09:41:40 -05:00
Merge pull request #705 from Freika/feature/google-records-json-improvement
Improve performance of Records.json importing
This commit is contained in:
commit
4ecc565d5c
11 changed files with 175 additions and 80 deletions
|
|
@ -1 +1 @@
|
|||
0.23.2
|
||||
0.23.3
|
||||
|
|
|
|||
|
|
@ -5,7 +5,11 @@ All notable changes to this project will be documented in this file.
|
|||
The format is based on [Keep a Changelog](http://keepachangelog.com/)
|
||||
and this project adheres to [Semantic Versioning](http://semver.org/).
|
||||
|
||||
# 0.23.2 - 2025-01-21
|
||||
# 0.23.3 - 2025-01-21
|
||||
|
||||
### Fixed
|
||||
|
||||
- Drastically improved performance for Google's Records.json import. It will now take less than 5 minutes to import 500,000 points, which previously took a few hours.
|
||||
|
||||
### Fixed
|
||||
|
||||
|
|
|
|||
|
|
@ -4,11 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob
|
|||
queue_as :imports
|
||||
sidekiq_options retry: false
|
||||
|
||||
def perform(import_id, json_string)
|
||||
def perform(import_id, locations, current_index)
|
||||
locations_batch = Oj.load(locations)
|
||||
import = Import.find(import_id)
|
||||
|
||||
json = Oj.load(json_string)
|
||||
|
||||
GoogleMaps::RecordsParser.new(import).call(json)
|
||||
GoogleMaps::RecordsImporter.new(import, current_index).call(locations_batch)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ class Import < ApplicationRecord
|
|||
belongs_to :user
|
||||
has_many :points, dependent: :destroy
|
||||
|
||||
delegate :count, to: :points, prefix: true
|
||||
|
||||
include ImportUploader::Attachment(:raw)
|
||||
|
||||
enum :source, {
|
||||
|
|
|
|||
84
app/services/google_maps/records_importer.rb
Normal file
84
app/services/google_maps/records_importer.rb
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class GoogleMaps::RecordsImporter
|
||||
include Imports::Broadcaster
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
attr_reader :import, :current_index
|
||||
|
||||
def initialize(import, current_index = 0)
|
||||
@import = import
|
||||
@batch = []
|
||||
@current_index = current_index
|
||||
end
|
||||
|
||||
def call(locations)
|
||||
Array(locations).each_slice(BATCH_SIZE) do |location_batch|
|
||||
batch = location_batch.map { prepare_location_data(_1) }
|
||||
bulk_insert_points(batch)
|
||||
broadcast_import_progress(import, current_index)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# rubocop:disable Metrics/MethodLength
|
||||
def prepare_location_data(location)
|
||||
{
|
||||
latitude: location['latitudeE7'].to_f / 10**7,
|
||||
longitude: location['longitudeE7'].to_f / 10**7,
|
||||
timestamp: parse_timestamp(location),
|
||||
altitude: location['altitude'],
|
||||
velocity: location['velocity'],
|
||||
raw_data: location,
|
||||
topic: 'Google Maps Timeline Export',
|
||||
tracker_id: 'google-maps-timeline-export',
|
||||
import_id: @import.id,
|
||||
user_id: @import.user_id,
|
||||
created_at: Time.current,
|
||||
updated_at: Time.current
|
||||
}
|
||||
end
|
||||
# rubocop:enable Metrics/MethodLength
|
||||
|
||||
def bulk_insert_points(batch)
|
||||
unique_batch = deduplicate_batch(batch)
|
||||
|
||||
# rubocop:disable Rails/SkipsModelValidations
|
||||
Point.upsert_all(
|
||||
unique_batch,
|
||||
unique_by: %i[latitude longitude timestamp user_id],
|
||||
returning: false,
|
||||
on_duplicate: :skip
|
||||
)
|
||||
# rubocop:enable Rails/SkipsModelValidations
|
||||
rescue StandardError => e
|
||||
create_notification("Failed to process location batch: #{e.message}")
|
||||
end
|
||||
|
||||
def deduplicate_batch(batch)
|
||||
batch.uniq do |record|
|
||||
[
|
||||
record[:latitude].round(7),
|
||||
record[:longitude].round(7),
|
||||
record[:timestamp],
|
||||
record[:user_id]
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
def parse_timestamp(location)
|
||||
Timestamps.parse_timestamp(
|
||||
location['timestamp'] || location['timestampMs']
|
||||
)
|
||||
end
|
||||
|
||||
def create_notification(message)
|
||||
Notification.create!(
|
||||
user: @import.user,
|
||||
title: 'Google\'s Records.json Import Error',
|
||||
content: message,
|
||||
kind: :error
|
||||
)
|
||||
end
|
||||
end
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class GoogleMaps::RecordsParser
|
||||
attr_reader :import
|
||||
|
||||
def initialize(import)
|
||||
@import = import
|
||||
end
|
||||
|
||||
def call(json)
|
||||
data = parse_json(json)
|
||||
|
||||
return if Point.exists?(
|
||||
latitude: data[:latitude],
|
||||
longitude: data[:longitude],
|
||||
timestamp: data[:timestamp],
|
||||
user_id: import.user_id
|
||||
)
|
||||
|
||||
Point.create(
|
||||
latitude: data[:latitude],
|
||||
longitude: data[:longitude],
|
||||
timestamp: data[:timestamp],
|
||||
raw_data: data[:raw_data],
|
||||
topic: 'Google Maps Timeline Export',
|
||||
tracker_id: 'google-maps-timeline-export',
|
||||
import_id: import.id,
|
||||
user_id: import.user_id
|
||||
)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def parse_json(json)
|
||||
{
|
||||
latitude: json['latitudeE7'].to_f / 10**7,
|
||||
longitude: json['longitudeE7'].to_f / 10**7,
|
||||
timestamp: Timestamps.parse_timestamp(json['timestamp'] || json['timestampMs']),
|
||||
altitude: json['altitude'],
|
||||
velocity: json['velocity'],
|
||||
raw_data: json
|
||||
}
|
||||
end
|
||||
end
|
||||
|
|
@ -1,9 +1,10 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# This class is named based on Google Takeout's Records.json file,
|
||||
# the main source of user's location history data.
|
||||
# This class is named based on Google Takeout's Records.json file
|
||||
|
||||
class Tasks::Imports::GoogleRecords
|
||||
BATCH_SIZE = 1000 # Adjust based on your needs
|
||||
|
||||
def initialize(file_path, user_email)
|
||||
@file_path = file_path
|
||||
@user = User.find_by(email: user_email)
|
||||
|
|
@ -14,10 +15,11 @@ class Tasks::Imports::GoogleRecords
|
|||
|
||||
import_id = create_import
|
||||
log_start
|
||||
file_content = read_file
|
||||
json_data = Oj.load(file_content)
|
||||
schedule_import_jobs(json_data, import_id)
|
||||
process_file_in_batches(import_id)
|
||||
log_success
|
||||
rescue Oj::ParseError => e
|
||||
Rails.logger.error("JSON parsing error: #{e.message}")
|
||||
raise
|
||||
end
|
||||
|
||||
private
|
||||
|
|
@ -26,14 +28,26 @@ class Tasks::Imports::GoogleRecords
|
|||
@user.imports.create(name: @file_path, source: :google_records).id
|
||||
end
|
||||
|
||||
def read_file
|
||||
File.read(@file_path)
|
||||
end
|
||||
def process_file_in_batches(import_id)
|
||||
batch = []
|
||||
|
||||
def schedule_import_jobs(json_data, import_id)
|
||||
json_data['locations'].each do |json|
|
||||
Import::GoogleTakeoutJob.perform_later(import_id, json.to_json)
|
||||
Oj.load_file(@file_path, mode: :compat) do |record|
|
||||
next unless record.is_a?(Hash) && record['locations']
|
||||
|
||||
index = 0
|
||||
|
||||
record['locations'].each do |location|
|
||||
batch << location
|
||||
|
||||
next unless batch.size >= BATCH_SIZE
|
||||
|
||||
index += BATCH_SIZE
|
||||
Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch), index)
|
||||
batch = []
|
||||
end
|
||||
end
|
||||
|
||||
Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch)) if batch.any?
|
||||
end
|
||||
|
||||
def log_start
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
Sidekiq.configure_server do |config|
|
||||
config.redis = { url: ENV['REDIS_URL'] }
|
||||
config.logger = Sidekiq::Logger.new($stdout)
|
||||
|
||||
if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true'
|
||||
require 'prometheus_exporter/instrumentation'
|
||||
|
|
|
|||
|
|
@ -2,23 +2,38 @@
|
|||
|
||||
require 'rails_helper'
|
||||
|
||||
RSpec.describe GoogleMaps::RecordsParser do
|
||||
RSpec.describe GoogleMaps::RecordsImporter do
|
||||
describe '#call' do
|
||||
subject(:parser) { described_class.new(import).call(json) }
|
||||
subject(:parser) { described_class.new(import).call(locations) }
|
||||
|
||||
let(:import) { create(:import) }
|
||||
let(:time) { DateTime.new(2025, 1, 1, 12, 0, 0) }
|
||||
let(:json) do
|
||||
{
|
||||
'latitudeE7' => 123_456_789,
|
||||
'longitudeE7' => 123_456_789,
|
||||
'altitude' => 0,
|
||||
'velocity' => 0
|
||||
}
|
||||
let(:locations) do
|
||||
[
|
||||
{
|
||||
'timestampMs' => (time.to_f * 1000).to_i.to_s,
|
||||
'latitudeE7' => 123_456_789,
|
||||
'longitudeE7' => 123_456_789,
|
||||
'accuracy' => 10,
|
||||
'altitude' => 100,
|
||||
'verticalAccuracy' => 5,
|
||||
'activity' => [
|
||||
{
|
||||
'timestampMs' => (time.to_f * 1000).to_i.to_s,
|
||||
'activity' => [
|
||||
{
|
||||
'type' => 'STILL',
|
||||
'confidence' => 100
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
context 'with regular timestamp' do
|
||||
let(:json) { super().merge('timestamp' => time.to_s) }
|
||||
let(:locations) { super()[0].merge('timestamp' => time.to_s).to_json }
|
||||
|
||||
it 'creates a point' do
|
||||
expect { parser }.to change(Point, :count).by(1)
|
||||
|
|
@ -26,11 +41,23 @@ RSpec.describe GoogleMaps::RecordsParser do
|
|||
end
|
||||
|
||||
context 'when point already exists' do
|
||||
let(:json) { super().merge('timestamp' => time.to_s) }
|
||||
let(:locations) do
|
||||
[
|
||||
super()[0].merge(
|
||||
'timestamp' => time.to_s,
|
||||
'latitudeE7' => 123_456_789,
|
||||
'longitudeE7' => 123_456_789
|
||||
)
|
||||
]
|
||||
end
|
||||
|
||||
before do
|
||||
create(
|
||||
:point, user: import.user, import:, latitude: 12.3456789, longitude: 12.3456789,
|
||||
:point,
|
||||
user: import.user,
|
||||
import: import,
|
||||
latitude: 12.3456789,
|
||||
longitude: 12.3456789,
|
||||
timestamp: time.to_i
|
||||
)
|
||||
end
|
||||
|
|
@ -41,7 +68,9 @@ RSpec.describe GoogleMaps::RecordsParser do
|
|||
end
|
||||
|
||||
context 'with timestampMs in milliseconds' do
|
||||
let(:json) { super().merge('timestampMs' => (time.to_f * 1000).to_i.to_s) }
|
||||
let(:locations) do
|
||||
[super()[0].merge('timestampMs' => (time.to_f * 1000).to_i.to_s)]
|
||||
end
|
||||
|
||||
it 'creates a point using milliseconds timestamp' do
|
||||
expect { parser }.to change(Point, :count).by(1)
|
||||
|
|
@ -49,7 +78,9 @@ RSpec.describe GoogleMaps::RecordsParser do
|
|||
end
|
||||
|
||||
context 'with ISO 8601 timestamp' do
|
||||
let(:json) { super().merge('timestamp' => time.iso8601) }
|
||||
let(:locations) do
|
||||
[super()[0].merge('timestamp' => time.iso8601)]
|
||||
end
|
||||
|
||||
it 'parses ISO 8601 timestamp correctly' do
|
||||
expect { parser }.to change(Point, :count).by(1)
|
||||
|
|
@ -59,7 +90,9 @@ RSpec.describe GoogleMaps::RecordsParser do
|
|||
end
|
||||
|
||||
context 'with timestamp in milliseconds' do
|
||||
let(:json) { super().merge('timestamp' => (time.to_f * 1000).to_i.to_s) }
|
||||
let(:locations) do
|
||||
[super()[0].merge('timestamp' => (time.to_f * 1000).to_i.to_s)]
|
||||
end
|
||||
|
||||
it 'parses millisecond timestamp correctly' do
|
||||
expect { parser }.to change(Point, :count).by(1)
|
||||
|
|
@ -69,7 +102,9 @@ RSpec.describe GoogleMaps::RecordsParser do
|
|||
end
|
||||
|
||||
context 'with timestamp in seconds' do
|
||||
let(:json) { super().merge('timestamp' => time.to_i.to_s) }
|
||||
let(:locations) do
|
||||
[super()[0].merge('timestamp' => time.to_i.to_s)]
|
||||
end
|
||||
|
||||
it 'parses second timestamp correctly' do
|
||||
expect { parser }.to change(Point, :count).by(1)
|
||||
|
|
@ -5,10 +5,10 @@ require 'rails_helper'
|
|||
RSpec.describe Tasks::Imports::GoogleRecords do
|
||||
describe '#call' do
|
||||
let(:user) { create(:user) }
|
||||
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json') }
|
||||
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json').to_s }
|
||||
|
||||
it 'schedules the Import::GoogleTakeoutJob' do
|
||||
expect(Import::GoogleTakeoutJob).to receive(:perform_later).exactly(3).times
|
||||
expect(Import::GoogleTakeoutJob).to receive(:perform_later).exactly(1).time
|
||||
|
||||
described_class.new(file_path, user.email).call
|
||||
end
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
require 'rails_helper'
|
||||
|
||||
describe 'import.rake' do
|
||||
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json') }
|
||||
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json').to_s }
|
||||
let(:user) { create(:user) }
|
||||
|
||||
it 'calls importing class' do
|
||||
|
|
|
|||
Loading…
Reference in a new issue