Speed up some importing processes

This commit is contained in:
Eugene Burmakin 2025-02-22 23:14:23 +01:00
parent 7c766a4d92
commit 383b88ab04
31 changed files with 254 additions and 165 deletions

View file

@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## TODO:
- Data migration to convert `latitude` and `longitude` to `lonlat` column.
- Realtime broadcast for importing progress
- Frontend update to use `lonlat` column.
## Fixed
@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Restrict access to users management in non self-hosted mode.
- Points are now using `lonlat` column for storing longitude and latitude.
- Semantic history points are now being imported much faster.
- GPX files are now being imported much faster.
# 0.24.1 - 2025-02-13

View file

@ -9,7 +9,7 @@ gem 'bootsnap', require: false
gem 'chartkick'
gem 'data_migrate'
gem 'devise'
gem 'geocoder', path: '../../geocoder'
gem 'geocoder'
gem 'gpx'
gem 'groupdate'
gem 'httparty'

View file

@ -1,10 +1,3 @@
PATH
remote: ../../geocoder
specs:
geocoder (1.8.5)
base64 (>= 0.1.0)
csv (>= 3.0.0)
GEM
remote: https://rubygems.org/
specs:
@ -145,6 +138,9 @@ GEM
fugit (1.11.1)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
geocoder (1.8.5)
base64 (>= 0.1.0)
csv (>= 3.0.0)
globalid (1.2.1)
activesupport (>= 6.1)
gpx (1.2.0)
@ -465,7 +461,7 @@ DEPENDENCIES
fakeredis
ffaker
foreman
geocoder!
geocoder
gpx
groupdate
httparty

View file

@ -0,0 +1,13 @@
# frozen_string_literal: true
class DataMigrations::MigratePointsLatlonJob < ApplicationJob
queue_as :default
def perform(user_id)
user = User.find(user_id)
# rubocop:disable Rails/SkipsModelValidations
user.tracked_points.update_all('lonlat = ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)')
# rubocop:enable Rails/SkipsModelValidations
end
end

View file

@ -6,23 +6,21 @@ class Overland::BatchCreatingJob < ApplicationJob
def perform(params, user_id)
data = Overland::Params.new(params).call
records = data.map do |location|
{
lonlat: location[:lonlat],
timestamp: location[:timestamp],
user_id: user_id,
created_at: Time.current,
updated_at: Time.current
}
end
data.each do |location|
next if point_exists?(location, user_id)
# rubocop:disable Rails/SkipsModelValidations
Point.upsert_all(
records,
unique_by: %i[lonlat timestamp user_id],
returning: false,
on_duplicate: :skip
Point.create!(location.merge(user_id:))
end
end
private
def point_exists?(params, user_id)
Point.exists?(
latitude: params[:latitude],
longitude: params[:longitude],
timestamp: params[:timestamp],
user_id:
)
# rubocop:enable Rails/SkipsModelValidations
end
end

View file

@ -50,21 +50,22 @@ class Point < ApplicationRecord
end
def lon
lonlat.x.to_s
lonlat.x
end
def lat
lonlat.y.to_s
lonlat.y
end
private
# rubocop:disable Metrics/MethodLength Metrics/AbcSize
def broadcast_coordinates
PointsChannel.broadcast_to(
user,
[
latitude.to_f,
longitude.to_f,
lat,
lon,
battery.to_s,
altitude.to_s,
timestamp.to_s,
@ -74,4 +75,5 @@ class Point < ApplicationRecord
]
)
end
# rubocop:enable Metrics/MethodLength
end

View file

@ -22,8 +22,8 @@ class ExportSerializer
def export_point(point)
{
lat: point.lat,
lon: point.lon,
lat: point.lat.to_s,
lon: point.lon.to_s,
bs: battery_status(point),
batt: point.battery,
p: point.ping,

View file

@ -17,8 +17,8 @@ class Points::GpxSerializer
points.each do |point|
track_segment.points << GPX::TrackPoint.new(
lat: point.lat.to_f,
lon: point.lon.to_f,
lat: point.lat,
lon: point.lon,
elevation: point.altitude.to_f,
time: point.recorded_at
)

View file

@ -0,0 +1,82 @@
# frozen_string_literal: true
class Gpx::TrackImporter
include Imports::Broadcaster
attr_reader :import, :json, :user_id
def initialize(import, user_id)
@import = import
@json = import.raw_data
@user_id = user_id
end
def call
tracks = json['gpx']['trk']
tracks_arr = tracks.is_a?(Array) ? tracks : [tracks]
points = tracks_arr.map { parse_track(_1) }.flatten.compact
points_data = points.map.with_index(1) { |point, index| prepare_point(point, index) }.compact
bulk_insert_points(points_data)
end
private
def parse_track(track)
return if track['trkseg'].blank?
segments = track['trkseg']
segments_array = segments.is_a?(Array) ? segments : [segments]
segments_array.compact.map { |segment| segment['trkpt'] }
end
def prepare_point(point, index)
return if point['lat'].blank? || point['lon'].blank? || point['time'].blank?
{
lonlat: "POINT(#{point['lon'].to_d} #{point['lat'].to_d})",
altitude: point['ele'].to_i,
timestamp: Time.parse(point['time']).to_i,
import_id: import.id,
velocity: speed(point),
raw_data: point,
user_id: user_id,
created_at: Time.current,
updated_at: Time.current
}
end
def bulk_insert_points(batch)
unique_batch = batch.uniq { |record| [record[:lonlat], record[:timestamp], record[:user_id]] }
# rubocop:disable Rails/SkipsModelValidations
Point.upsert_all(
unique_batch,
unique_by: %i[lonlat timestamp user_id],
returning: false,
on_duplicate: :skip
)
# rubocop:enable Rails/SkipsModelValidations
rescue StandardError => e
create_notification("Failed to process GPX track: #{e.message}")
end
def create_notification(message)
Notification.create!(
user_id: user_id,
title: 'GPX Import Error',
content: message,
kind: :error
)
end
def speed(point)
return if point['extensions'].blank?
(
point.dig('extensions', 'speed') || point.dig('extensions', 'TrackPointExtension', 'speed')
).to_f.round(1)
end
end

View file

@ -1,66 +0,0 @@
# frozen_string_literal: true
class Gpx::TrackParser
include Imports::Broadcaster
attr_reader :import, :json, :user_id
def initialize(import, user_id)
@import = import
@json = import.raw_data
@user_id = user_id
end
def call
tracks = json['gpx']['trk']
tracks_arr = tracks.is_a?(Array) ? tracks : [tracks]
tracks_arr.map { parse_track(_1) }.flatten.compact.each.with_index(1) do |point, index|
create_point(point, index)
end
end
private
def parse_track(track)
return if track['trkseg'].blank?
segments = track['trkseg']
segments_array = segments.is_a?(Array) ? segments : [segments]
segments_array.compact.map { |segment| segment['trkpt'] }
end
def create_point(point, index)
return if point['lat'].blank? || point['lon'].blank? || point['time'].blank?
return if point_exists?(point)
Point.create(
lonlat: "POINT(#{point['lon'].to_d} #{point['lat'].to_d})",
altitude: point['ele'].to_i,
timestamp: Time.parse(point['time']).to_i,
import_id: import.id,
velocity: speed(point),
raw_data: point,
user_id:
)
broadcast_import_progress(import, index)
end
def point_exists?(point)
Point.exists?(
lonlat: "POINT(#{point['lon'].to_d} #{point['lat'].to_d})",
timestamp: Time.parse(point['time']).to_i,
user_id:
)
end
def speed(point)
return if point['extensions'].blank?
(
point.dig('extensions', 'speed') || point.dig('extensions', 'TrackPointExtension', 'speed')
).to_f.round(1)
end
end

View file

@ -26,8 +26,8 @@ class Imports::Create
case source
when 'google_semantic_history' then GoogleMaps::SemanticHistoryParser
when 'google_phone_takeout' then GoogleMaps::PhoneTakeoutParser
when 'owntracks' then OwnTracks::ExportParser
when 'gpx' then Gpx::TrackParser
when 'owntracks' then OwnTracks::Importer
when 'gpx' then Gpx::TrackImporter
when 'geojson' then Geojson::ImportParser
when 'immich_api', 'photoprism_api' then Photos::ImportParser
end

View file

@ -1,34 +0,0 @@
# frozen_string_literal: true
class OwnTracks::ExportParser
include Imports::Broadcaster
attr_reader :import, :data, :user_id
def initialize(import, user_id)
@import = import
@data = import.raw_data
@user_id = user_id
end
def call
points_data = data.map { |point| OwnTracks::Params.new(point).call }
points_data.each.with_index(1) do |point_data, index|
next if Point.exists?(
lonlat: point_data[:lonlat],
timestamp: point_data[:timestamp],
user_id:
)
point = Point.new(point_data).tap do |p|
p.user_id = user_id
p.import_id = import.id
end
point.save
broadcast_import_progress(import, index)
end
end
end

View file

@ -0,0 +1,52 @@
# frozen_string_literal: true
class OwnTracks::Importer
include Imports::Broadcaster
attr_reader :import, :data, :user_id
def initialize(import, user_id)
@import = import
@data = import.raw_data
@user_id = user_id
end
def call
points_data = data.map.with_index(1) do |point, index|
OwnTracks::Params.new(point).call.merge(
import_id: import.id,
user_id: user_id,
created_at: Time.current,
updated_at: Time.current
)
end
bulk_insert_points(points_data)
end
private
def bulk_insert_points(batch)
unique_batch = batch.uniq { |record| [record[:lonlat], record[:timestamp], record[:user_id]] }
# rubocop:disable Rails/SkipsModelValidations
Point.upsert_all(
unique_batch,
unique_by: %i[lonlat timestamp user_id],
returning: false,
on_duplicate: :skip
)
# rubocop:enable Rails/SkipsModelValidations
rescue StandardError => e
create_notification("Failed to process OwnTracks data: #{e.message}")
end
def create_notification(message)
Notification.create!(
user_id: user_id,
title: 'OwnTracks Import Error',
content: message,
kind: :error
)
end
end

View file

@ -0,0 +1,13 @@
# frozen_string_literal: true
class MigratePointsLatlon < ActiveRecord::Migration[8.0]
def up
User.find_each do |user|
DataMigrations::MigratePointsLatlonJob.perform_later(user.id)
end
end
def down
raise ActiveRecord::IrreversibleMigration
end
end

View file

@ -0,0 +1,16 @@
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe DataMigrations::MigratePointsLatlonJob, type: :job do
describe '#perform' do
it 'updates the lonlat column for all tracked points' do
user = create(:user)
point = create(:point, latitude: 2.0, longitude: 1.0, user: user)
expect { subject.perform(user.id) }.to change {
point.reload.lonlat
}.to(RGeo::Geographic.spherical_factory.point(1.0, 2.0))
end
end
end

View file

@ -26,7 +26,7 @@ RSpec.describe ImportJob, type: :job do
context 'when there is an error' do
before do
allow_any_instance_of(OwnTracks::ExportParser).to receive(:call).and_raise(StandardError)
allow_any_instance_of(OwnTracks::Importer).to receive(:call).and_raise(StandardError)
end
it 'does not create points' do

View file

@ -62,5 +62,21 @@ RSpec.describe Point, type: :model do
end
end
end
describe '#lon' do
let(:point) { create(:point, lonlat: 'POINT(1 2)') }
it 'returns longitude' do
expect(point.lon).to eq(1)
end
end
describe '#lat' do
let(:point) { create(:point, lonlat: 'POINT(1 2)') }
it 'returns latitude' do
expect(point.lat).to eq(2)
end
end
end
end

View file

@ -10,8 +10,8 @@ RSpec.describe Api::SlimPointSerializer do
let(:expected_json) do
{
id: point.id,
latitude: point.lat,
longitude: point.lon,
latitude: point.lat.to_s,
longitude: point.lon.to_s,
timestamp: point.timestamp
}
end

View file

@ -18,8 +18,8 @@ RSpec.describe ExportSerializer do
user_email => {
'dawarich-export' => [
{
lat: points.first.lat,
lon: points.first.lon,
lat: points.first.lat.to_s,
lon: points.first.lon.to_s,
bs: 'u',
batt: points.first.battery,
p: points.first.ping,
@ -39,8 +39,8 @@ RSpec.describe ExportSerializer do
raw_data: points.first.raw_data
},
{
lat: points.second.lat,
lon: points.second.lon,
lat: points.second.lat.to_s,
lon: points.second.lon.to_s,
bs: 'u',
batt: points.second.battery,
p: points.second.ping,

View file

@ -15,7 +15,7 @@ RSpec.describe PointSerializer do
'tracker_id' => point.tracker_id,
'topic' => point.topic,
'altitude' => point.altitude,
'longitude' => point.lon,
'longitude' => point.lon.to_s,
'velocity' => point.velocity,
'trigger' => point.trigger,
'bssid' => point.bssid,
@ -24,7 +24,7 @@ RSpec.describe PointSerializer do
'vertical_accuracy' => point.vertical_accuracy,
'accuracy' => point.accuracy,
'timestamp' => point.timestamp,
'latitude' => point.lat,
'latitude' => point.lat.to_s,
'mode' => point.mode,
'inrids' => point.inrids,
'in_regions' => point.in_regions,

View file

@ -20,7 +20,7 @@ RSpec.describe Points::GeojsonSerializer do
type: 'Feature',
geometry: {
type: 'Point',
coordinates: [point.lon, point.lat]
coordinates: [point.lon.to_s, point.lat.to_s]
},
properties: PointSerializer.new(point).call
}

View file

@ -24,8 +24,8 @@ RSpec.describe Points::GpxSerializer do
serializer.tracks[0].points.each_with_index do |track_point, index|
point = points[index]
expect(track_point.lat.to_s).to eq(point.lat)
expect(track_point.lon.to_s).to eq(point.lon)
expect(track_point.lat.to_s).to eq(point.lat.to_s)
expect(track_point.lon.to_s).to eq(point.lon.to_s)
expect(track_point.time).to eq(point.recorded_at)
end
end

View file

@ -35,12 +35,12 @@ RSpec.describe GoogleMaps::PhoneTakeoutParser do
it 'creates points with correct data' do
parser
expect(Point.all[6].lat).to eq('27.696576')
expect(Point.all[6].lon).to eq('-97.376949')
expect(Point.all[6].lat).to eq(27.696576)
expect(Point.all[6].lon).to eq(-97.376949)
expect(Point.all[6].timestamp).to eq(1_693_180_140)
expect(Point.last.lat).to eq('27.709617')
expect(Point.last.lon).to eq('-97.375988')
expect(Point.last.lat).to eq(27.709617)
expect(Point.last.lon).to eq(-97.375988)
expect(Point.last.timestamp).to eq(1_693_180_320)
end
end

View file

@ -2,7 +2,7 @@
require 'rails_helper'
RSpec.describe Gpx::TrackParser do
RSpec.describe Gpx::TrackImporter do
describe '#call' do
subject(:parser) { described_class.new(import, user.id).call }
@ -53,8 +53,8 @@ RSpec.describe Gpx::TrackParser do
it 'creates points with correct data' do
parser
expect(Point.first.lat).to eq('37.1722103')
expect(Point.first.lon).to eq('-3.55468')
expect(Point.first.lat).to eq(37.1722103)
expect(Point.first.lon).to eq(-3.55468)
expect(Point.first.altitude).to eq(1066)
expect(Point.first.timestamp).to eq(Time.zone.parse('2024-04-21T10:19:55Z').to_i)
expect(Point.first.velocity).to eq('2.9')
@ -67,8 +67,8 @@ RSpec.describe Gpx::TrackParser do
it 'creates points with correct data' do
parser
expect(Point.first.lat).to eq('10.758321212464024')
expect(Point.first.lon).to eq('106.64234449272531')
expect(Point.first.lat).to eq(10.758321212464024)
expect(Point.first.lon).to eq(106.64234449272531)
expect(Point.first.altitude).to eq(17)
expect(Point.first.timestamp).to eq(1_730_626_211)
expect(Point.first.velocity).to eq('2.8')

View file

@ -30,8 +30,8 @@ RSpec.describe Imports::Create do
context 'when source is owntracks' do
let(:import) { create(:import, source: 'owntracks') }
it 'calls the OwnTracks::ExportParser' do
expect(OwnTracks::ExportParser).to \
it 'calls the OwnTracks::Importer' do
expect(OwnTracks::Importer).to \
receive(:new).with(import, user.id).and_return(double(call: true))
service.call
end
@ -59,7 +59,7 @@ RSpec.describe Imports::Create do
context 'when import fails' do
before do
allow(OwnTracks::ExportParser).to receive(:new).with(import, user.id).and_raise(StandardError)
allow(OwnTracks::Importer).to receive(:new).with(import, user.id).and_raise(StandardError)
end
it 'creates a failed notification' do
@ -73,8 +73,8 @@ RSpec.describe Imports::Create do
context 'when source is gpx' do
let(:import) { create(:import, source: 'gpx') }
it 'calls the Gpx::TrackParser' do
expect(Gpx::TrackParser).to \
it 'calls the Gpx::TrackImporter' do
expect(Gpx::TrackImporter).to \
receive(:new).with(import, user.id).and_return(double(call: true))
service.call
end

View file

@ -2,7 +2,7 @@
require 'rails_helper'
RSpec.describe OwnTracks::ExportParser do
RSpec.describe OwnTracks::Importer do
describe '#call' do
subject(:parser) { described_class.new(import, user.id).call }

View file

@ -33,8 +33,8 @@ RSpec.describe Visits::Prepare do
date: static_time.to_date.to_s,
visits: [
{
latitude: '0.0',
longitude: '0.0',
latitude: 0.0,
longitude: 0.0,
radius: 10,
points:,
duration: 105,