diff --git a/.app_version b/.app_version index fda96dcf..9e40e75c 100644 --- a/.app_version +++ b/.app_version @@ -1 +1 @@ -0.23.2 +0.23.3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 04da2e91..f50e154a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -# 0.23.2 - 2025-01-21 +# 0.23.3 - 2025-01-21 + +### Fixed + +- Drastically improved performance for Google's Records.json import. It will now take less than 5 minutes to import 500,000 points, which previously took a few hours. ### Fixed diff --git a/app/jobs/import/google_takeout_job.rb b/app/jobs/import/google_takeout_job.rb index d962a304..02702cf7 100644 --- a/app/jobs/import/google_takeout_job.rb +++ b/app/jobs/import/google_takeout_job.rb @@ -4,11 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob queue_as :imports sidekiq_options retry: false - def perform(import_id, json_string) + def perform(import_id, locations, current_index) + locations_batch = Oj.load(locations) import = Import.find(import_id) - json = Oj.load(json_string) - - GoogleMaps::RecordsParser.new(import).call(json) + GoogleMaps::RecordsImporter.new(import, current_index).call(locations_batch) end end diff --git a/app/models/import.rb b/app/models/import.rb index f396c555..045e8b5f 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -4,6 +4,8 @@ class Import < ApplicationRecord belongs_to :user has_many :points, dependent: :destroy + delegate :count, to: :points, prefix: true + include ImportUploader::Attachment(:raw) enum :source, { diff --git a/app/services/google_maps/records_importer.rb b/app/services/google_maps/records_importer.rb new file mode 100644 index 00000000..c7edfb1f --- /dev/null +++ b/app/services/google_maps/records_importer.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +class GoogleMaps::RecordsImporter + include Imports::Broadcaster + + BATCH_SIZE = 1000 + attr_reader :import, :current_index + + def initialize(import, current_index = 0) + @import = import + @batch = [] + @current_index = current_index + end + + def call(locations) + Array(locations).each_slice(BATCH_SIZE) do |location_batch| + batch = location_batch.map { prepare_location_data(_1) } + bulk_insert_points(batch) + broadcast_import_progress(import, current_index) + end + end + + private + + # rubocop:disable Metrics/MethodLength + def prepare_location_data(location) + { + latitude: location['latitudeE7'].to_f / 10**7, + longitude: location['longitudeE7'].to_f / 10**7, + timestamp: parse_timestamp(location), + altitude: location['altitude'], + velocity: location['velocity'], + raw_data: location, + topic: 'Google Maps Timeline Export', + tracker_id: 'google-maps-timeline-export', + import_id: @import.id, + user_id: @import.user_id, + created_at: Time.current, + updated_at: Time.current + } + end + # rubocop:enable Metrics/MethodLength + + def bulk_insert_points(batch) + unique_batch = deduplicate_batch(batch) + + # rubocop:disable Rails/SkipsModelValidations + Point.upsert_all( + unique_batch, + unique_by: %i[latitude longitude timestamp user_id], + returning: false, + on_duplicate: :skip + ) + # rubocop:enable Rails/SkipsModelValidations + rescue StandardError => e + create_notification("Failed to process location batch: #{e.message}") + end + + def deduplicate_batch(batch) + batch.uniq do |record| + [ + record[:latitude].round(7), + record[:longitude].round(7), + record[:timestamp], + record[:user_id] + ] + end + end + + def parse_timestamp(location) + Timestamps.parse_timestamp( + location['timestamp'] || location['timestampMs'] + ) + end + + def create_notification(message) + Notification.create!( + user: @import.user, + title: 'Google\'s Records.json Import Error', + content: message, + kind: :error + ) + end +end diff --git a/app/services/google_maps/records_parser.rb b/app/services/google_maps/records_parser.rb deleted file mode 100644 index 04ee4621..00000000 --- a/app/services/google_maps/records_parser.rb +++ /dev/null @@ -1,44 +0,0 @@ -# frozen_string_literal: true - -class GoogleMaps::RecordsParser - attr_reader :import - - def initialize(import) - @import = import - end - - def call(json) - data = parse_json(json) - - return if Point.exists?( - latitude: data[:latitude], - longitude: data[:longitude], - timestamp: data[:timestamp], - user_id: import.user_id - ) - - Point.create( - latitude: data[:latitude], - longitude: data[:longitude], - timestamp: data[:timestamp], - raw_data: data[:raw_data], - topic: 'Google Maps Timeline Export', - tracker_id: 'google-maps-timeline-export', - import_id: import.id, - user_id: import.user_id - ) - end - - private - - def parse_json(json) - { - latitude: json['latitudeE7'].to_f / 10**7, - longitude: json['longitudeE7'].to_f / 10**7, - timestamp: Timestamps.parse_timestamp(json['timestamp'] || json['timestampMs']), - altitude: json['altitude'], - velocity: json['velocity'], - raw_data: json - } - end -end diff --git a/app/services/tasks/imports/google_records.rb b/app/services/tasks/imports/google_records.rb index 8f8839e3..83174128 100644 --- a/app/services/tasks/imports/google_records.rb +++ b/app/services/tasks/imports/google_records.rb @@ -1,9 +1,10 @@ # frozen_string_literal: true -# This class is named based on Google Takeout's Records.json file, -# the main source of user's location history data. +# This class is named based on Google Takeout's Records.json file class Tasks::Imports::GoogleRecords + BATCH_SIZE = 1000 # Adjust based on your needs + def initialize(file_path, user_email) @file_path = file_path @user = User.find_by(email: user_email) @@ -14,10 +15,11 @@ class Tasks::Imports::GoogleRecords import_id = create_import log_start - file_content = read_file - json_data = Oj.load(file_content) - schedule_import_jobs(json_data, import_id) + process_file_in_batches(import_id) log_success + rescue Oj::ParseError => e + Rails.logger.error("JSON parsing error: #{e.message}") + raise end private @@ -26,14 +28,26 @@ class Tasks::Imports::GoogleRecords @user.imports.create(name: @file_path, source: :google_records).id end - def read_file - File.read(@file_path) - end + def process_file_in_batches(import_id) + batch = [] - def schedule_import_jobs(json_data, import_id) - json_data['locations'].each do |json| - Import::GoogleTakeoutJob.perform_later(import_id, json.to_json) + Oj.load_file(@file_path, mode: :compat) do |record| + next unless record.is_a?(Hash) && record['locations'] + + index = 0 + + record['locations'].each do |location| + batch << location + + next unless batch.size >= BATCH_SIZE + + index += BATCH_SIZE + Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch), index) + batch = [] + end end + + Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch)) if batch.any? end def log_start diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index d9dec786..ab3f00c5 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -2,6 +2,7 @@ Sidekiq.configure_server do |config| config.redis = { url: ENV['REDIS_URL'] } + config.logger = Sidekiq::Logger.new($stdout) if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true' require 'prometheus_exporter/instrumentation' diff --git a/spec/services/google_maps/records_parser_spec.rb b/spec/services/google_maps/records_importer_spec.rb similarity index 50% rename from spec/services/google_maps/records_parser_spec.rb rename to spec/services/google_maps/records_importer_spec.rb index 96495dad..8ce4d69d 100644 --- a/spec/services/google_maps/records_parser_spec.rb +++ b/spec/services/google_maps/records_importer_spec.rb @@ -2,23 +2,38 @@ require 'rails_helper' -RSpec.describe GoogleMaps::RecordsParser do +RSpec.describe GoogleMaps::RecordsImporter do describe '#call' do - subject(:parser) { described_class.new(import).call(json) } + subject(:parser) { described_class.new(import).call(locations) } let(:import) { create(:import) } let(:time) { DateTime.new(2025, 1, 1, 12, 0, 0) } - let(:json) do - { - 'latitudeE7' => 123_456_789, - 'longitudeE7' => 123_456_789, - 'altitude' => 0, - 'velocity' => 0 - } + let(:locations) do + [ + { + 'timestampMs' => (time.to_f * 1000).to_i.to_s, + 'latitudeE7' => 123_456_789, + 'longitudeE7' => 123_456_789, + 'accuracy' => 10, + 'altitude' => 100, + 'verticalAccuracy' => 5, + 'activity' => [ + { + 'timestampMs' => (time.to_f * 1000).to_i.to_s, + 'activity' => [ + { + 'type' => 'STILL', + 'confidence' => 100 + } + ] + } + ] + } + ] end context 'with regular timestamp' do - let(:json) { super().merge('timestamp' => time.to_s) } + let(:locations) { super()[0].merge('timestamp' => time.to_s).to_json } it 'creates a point' do expect { parser }.to change(Point, :count).by(1) @@ -26,11 +41,23 @@ RSpec.describe GoogleMaps::RecordsParser do end context 'when point already exists' do - let(:json) { super().merge('timestamp' => time.to_s) } + let(:locations) do + [ + super()[0].merge( + 'timestamp' => time.to_s, + 'latitudeE7' => 123_456_789, + 'longitudeE7' => 123_456_789 + ) + ] + end before do create( - :point, user: import.user, import:, latitude: 12.3456789, longitude: 12.3456789, + :point, + user: import.user, + import: import, + latitude: 12.3456789, + longitude: 12.3456789, timestamp: time.to_i ) end @@ -41,7 +68,9 @@ RSpec.describe GoogleMaps::RecordsParser do end context 'with timestampMs in milliseconds' do - let(:json) { super().merge('timestampMs' => (time.to_f * 1000).to_i.to_s) } + let(:locations) do + [super()[0].merge('timestampMs' => (time.to_f * 1000).to_i.to_s)] + end it 'creates a point using milliseconds timestamp' do expect { parser }.to change(Point, :count).by(1) @@ -49,7 +78,9 @@ RSpec.describe GoogleMaps::RecordsParser do end context 'with ISO 8601 timestamp' do - let(:json) { super().merge('timestamp' => time.iso8601) } + let(:locations) do + [super()[0].merge('timestamp' => time.iso8601)] + end it 'parses ISO 8601 timestamp correctly' do expect { parser }.to change(Point, :count).by(1) @@ -59,7 +90,9 @@ RSpec.describe GoogleMaps::RecordsParser do end context 'with timestamp in milliseconds' do - let(:json) { super().merge('timestamp' => (time.to_f * 1000).to_i.to_s) } + let(:locations) do + [super()[0].merge('timestamp' => (time.to_f * 1000).to_i.to_s)] + end it 'parses millisecond timestamp correctly' do expect { parser }.to change(Point, :count).by(1) @@ -69,7 +102,9 @@ RSpec.describe GoogleMaps::RecordsParser do end context 'with timestamp in seconds' do - let(:json) { super().merge('timestamp' => time.to_i.to_s) } + let(:locations) do + [super()[0].merge('timestamp' => time.to_i.to_s)] + end it 'parses second timestamp correctly' do expect { parser }.to change(Point, :count).by(1) diff --git a/spec/services/tasks/imports/google_records_spec.rb b/spec/services/tasks/imports/google_records_spec.rb index 0310dbd1..29fddfdf 100644 --- a/spec/services/tasks/imports/google_records_spec.rb +++ b/spec/services/tasks/imports/google_records_spec.rb @@ -5,10 +5,10 @@ require 'rails_helper' RSpec.describe Tasks::Imports::GoogleRecords do describe '#call' do let(:user) { create(:user) } - let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json') } + let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json').to_s } it 'schedules the Import::GoogleTakeoutJob' do - expect(Import::GoogleTakeoutJob).to receive(:perform_later).exactly(3).times + expect(Import::GoogleTakeoutJob).to receive(:perform_later).exactly(1).time described_class.new(file_path, user.email).call end diff --git a/spec/tasks/import_spec.rb b/spec/tasks/import_spec.rb index 4cd785db..0e718f76 100644 --- a/spec/tasks/import_spec.rb +++ b/spec/tasks/import_spec.rb @@ -3,7 +3,7 @@ require 'rails_helper' describe 'import.rake' do - let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json') } + let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json').to_s } let(:user) { create(:user) } it 'calls importing class' do