mirror of
https://github.com/Freika/dawarich.git
synced 2026-01-10 17:21:38 -05:00
Address come CR comments
This commit is contained in:
parent
fd8f8cedd7
commit
de07511820
9 changed files with 113 additions and 75 deletions
|
|
@ -2,10 +2,6 @@
|
|||
|
||||
class HomeController < ApplicationController
|
||||
def index
|
||||
# redirect_to 'https://dawarich.app', allow_other_host: true and return unless SELF_HOSTED
|
||||
|
||||
redirect_to map_url if current_user
|
||||
|
||||
@points = current_user.points.without_raw_data if current_user
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -45,6 +45,8 @@ class Tracks::BoundaryResolverJob < ApplicationJob
|
|||
session_data = session_manager.get_session_data
|
||||
total_tracks = session_data['tracks_created'] + boundary_tracks_resolved
|
||||
|
||||
session_manager.update_session(tracks_created: total_tracks)
|
||||
|
||||
session_manager.mark_completed
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -6,40 +6,68 @@ class Tracks::DailyGenerationJob < ApplicationJob
|
|||
queue_as :tracks
|
||||
|
||||
def perform
|
||||
Rails.logger.info "Starting daily track generation for users with recent activity"
|
||||
# Compute time window once at job start to ensure consistency
|
||||
time_window = compute_time_window
|
||||
|
||||
users_with_recent_activity.find_each do |user|
|
||||
process_user_tracks(user)
|
||||
Rails.logger.info 'Starting daily track generation for users with recent activity'
|
||||
|
||||
users_processed = 0
|
||||
users_failed = 0
|
||||
|
||||
begin
|
||||
users_with_recent_activity(time_window).find_each do |user|
|
||||
if process_user_tracks(user, time_window)
|
||||
users_processed += 1
|
||||
else
|
||||
users_failed += 1
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "Critical failure in daily track generation: #{e.message}"
|
||||
ExceptionReporter.call(e, 'Daily track generation job failed')
|
||||
raise
|
||||
end
|
||||
|
||||
Rails.logger.info "Completed daily track generation"
|
||||
Rails.logger.info "Completed daily track generation: #{users_processed} users processed, #{users_failed} users failed"
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def users_with_recent_activity
|
||||
# Users with points in last 2 days (buffer for cross-day tracks), via subquery
|
||||
def compute_time_window
|
||||
now = Time.current
|
||||
{
|
||||
activity_window_start: 2.days.ago(now),
|
||||
activity_window_end: now,
|
||||
processing_start: 3.days.ago(now).beginning_of_day,
|
||||
processing_end: now
|
||||
}
|
||||
end
|
||||
|
||||
def users_with_recent_activity(time_window)
|
||||
# Find users who have created points within the activity window
|
||||
# This gives buffer to handle cross-day tracks
|
||||
user_ids = Point.where(
|
||||
created_at: time_window[:activity_window_start]..time_window[:activity_window_end]
|
||||
).select(:user_id).distinct
|
||||
|
||||
user_ids = Point.where(created_at: 2.days.ago..Time.current).select(:user_id).distinct
|
||||
User.where(id: user_ids)
|
||||
end
|
||||
|
||||
def process_user_tracks(user)
|
||||
# Process tracks for the last 2 days with buffer
|
||||
start_at = 3.days.ago.beginning_of_day # Extra buffer for cross-day tracks
|
||||
end_at = Time.current
|
||||
|
||||
def process_user_tracks(user, time_window)
|
||||
Rails.logger.info "Enqueuing daily track generation for user #{user.id}"
|
||||
|
||||
Tracks::ParallelGeneratorJob.perform_later(
|
||||
user.id,
|
||||
start_at: start_at,
|
||||
end_at: end_at,
|
||||
start_at: time_window[:processing_start],
|
||||
end_at: time_window[:processing_end],
|
||||
mode: :daily,
|
||||
chunk_size: 6.hours # Smaller chunks for recent data processing
|
||||
chunk_size: 6.hours
|
||||
)
|
||||
|
||||
true
|
||||
rescue StandardError => e
|
||||
Rails.logger.error "Failed to enqueue daily track generation for user #{user.id}: #{e.message}"
|
||||
ExceptionReporter.call(e, "Daily track generation failed for user #{user.id}")
|
||||
false
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ class Tracks::TimeChunkProcessorJob < ApplicationJob
|
|||
def perform(user_id, session_id, chunk_data)
|
||||
@user = User.find(user_id)
|
||||
@session_manager = Tracks::SessionManager.new(user_id, session_id)
|
||||
@chunk_data = chunk_data
|
||||
@chunk_data = chunk_data.with_indifferent_access
|
||||
|
||||
return unless session_exists?
|
||||
|
||||
|
|
@ -99,7 +99,7 @@ class Tracks::TimeChunkProcessorJob < ApplicationJob
|
|||
# Additional validation for the distance result
|
||||
if !distance.finite? || distance < 0
|
||||
Rails.logger.error "Invalid distance calculated (#{distance}) for #{points.size} points in chunk #{chunk_data[:chunk_id]}"
|
||||
Rails.logger.debug "Point coordinates: #{points.map { |p| [p.latitude, p.longitude] }.inspect}"
|
||||
Rails.logger.debug "Point coordinates: #{points.map { |p| [p.lat, p.lon] }.inspect}"
|
||||
return nil
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -21,56 +21,24 @@ module Distanceable
|
|||
return 0 if points.length < 2
|
||||
|
||||
total_meters = points.each_cons(2).sum do |p1, p2|
|
||||
# Extract coordinates from lonlat (source of truth)
|
||||
next 0 unless valid_point_pair?(p1, p2)
|
||||
|
||||
begin
|
||||
# Check if lonlat exists and is valid
|
||||
if p1.lonlat.nil? || p2.lonlat.nil?
|
||||
next 0
|
||||
end
|
||||
|
||||
lat1, lon1 = p1.lat, p1.lon
|
||||
lat2, lon2 = p2.lat, p2.lon
|
||||
|
||||
# Check for nil coordinates extracted from lonlat
|
||||
if lat1.nil? || lon1.nil? || lat2.nil? || lon2.nil?
|
||||
next 0
|
||||
end
|
||||
|
||||
# Check for NaN or infinite coordinates
|
||||
if [lat1, lon1, lat2, lon2].any? { |coord| !coord.finite? }
|
||||
next 0
|
||||
end
|
||||
|
||||
# Check for valid latitude/longitude ranges
|
||||
if lat1.abs > 90 || lat2.abs > 90 || lon1.abs > 180 || lon2.abs > 180
|
||||
next 0
|
||||
end
|
||||
|
||||
distance_km = Geocoder::Calculations.distance_between(
|
||||
[lat1, lon1],
|
||||
[lat2, lon2],
|
||||
[p1.lat, p1.lon],
|
||||
[p2.lat, p2.lon],
|
||||
units: :km
|
||||
)
|
||||
|
||||
# Check if Geocoder returned NaN or infinite value
|
||||
if !distance_km.finite?
|
||||
next 0
|
||||
end
|
||||
|
||||
distance_km * 1000 # Convert km to meters
|
||||
rescue StandardError => e
|
||||
rescue StandardError
|
||||
next 0
|
||||
end
|
||||
|
||||
# Check if Geocoder returned valid value
|
||||
distance_km.finite? ? distance_km * 1000 : 0 # Convert km to meters
|
||||
end
|
||||
|
||||
result = total_meters.to_f / ::DISTANCE_UNITS[unit.to_sym]
|
||||
|
||||
# Final validation of result
|
||||
if !result.finite?
|
||||
return 0
|
||||
end
|
||||
|
||||
result
|
||||
result.finite? ? result : 0
|
||||
end
|
||||
|
||||
private
|
||||
|
|
@ -230,6 +198,25 @@ module Distanceable
|
|||
|
||||
private
|
||||
|
||||
def valid_point_pair?(p1, p2)
|
||||
return false if p1.lonlat.nil? || p2.lonlat.nil?
|
||||
|
||||
lat1, lon1 = p1.lat, p1.lon
|
||||
lat2, lon2 = p2.lat, p2.lon
|
||||
|
||||
# Check for nil coordinates
|
||||
return false if lat1.nil? || lon1.nil? || lat2.nil? || lon2.nil?
|
||||
|
||||
# Check for NaN or infinite coordinates
|
||||
coords = [lat1, lon1, lat2, lon2]
|
||||
return false if coords.any? { |coord| !coord.finite? }
|
||||
|
||||
# Check for valid latitude/longitude ranges
|
||||
return false if lat1.abs > 90 || lat2.abs > 90 || lon1.abs > 180 || lon2.abs > 180
|
||||
|
||||
true
|
||||
end
|
||||
|
||||
def extract_point(point)
|
||||
case point
|
||||
when Array
|
||||
|
|
|
|||
|
|
@ -34,7 +34,9 @@ class Point < ApplicationRecord
|
|||
after_create :set_country
|
||||
after_create_commit :broadcast_coordinates
|
||||
after_create_commit :trigger_incremental_track_generation, if: -> { import_id.nil? }
|
||||
after_commit :recalculate_track, on: :update, if: -> { track.present? }
|
||||
after_commit :recalculate_track,
|
||||
on: :update,
|
||||
if: -> { track_id.present? && (saved_change_to_lonlat? || saved_change_to_timestamp? || saved_change_to_track_id?) }
|
||||
|
||||
def self.without_raw_data
|
||||
select(column_names - ['raw_data'])
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require 'set'
|
||||
|
||||
# Service to detect and resolve tracks that span across multiple time chunks
|
||||
# Handles merging partial tracks and cleaning up duplicates from parallel processing
|
||||
class Tracks::BoundaryDetector
|
||||
|
|
@ -17,9 +19,25 @@ class Tracks::BoundaryDetector
|
|||
boundary_candidates = find_boundary_track_candidates
|
||||
return 0 if boundary_candidates.empty?
|
||||
|
||||
processed_ids = Set.new
|
||||
resolved_count = 0
|
||||
|
||||
boundary_candidates.each do |group|
|
||||
resolved_count += 1 if merge_boundary_tracks(group)
|
||||
group_ids = group.map(&:id)
|
||||
|
||||
# Skip if all tracks in this group have already been processed
|
||||
next if group_ids.all? { |id| processed_ids.include?(id) }
|
||||
|
||||
# Filter group to only include unprocessed tracks
|
||||
unprocessed_group = group.reject { |track| processed_ids.include?(track.id) }
|
||||
next if unprocessed_group.size < 2
|
||||
|
||||
# Attempt to merge the unprocessed tracks
|
||||
if merge_boundary_tracks(unprocessed_group)
|
||||
# Add all original member IDs to processed set
|
||||
processed_ids.merge(group_ids)
|
||||
resolved_count += 1
|
||||
end
|
||||
end
|
||||
|
||||
resolved_count
|
||||
|
|
@ -94,10 +112,12 @@ class Tracks::BoundaryDetector
|
|||
return false unless track1.points.exists? && track2.points.exists?
|
||||
|
||||
# Get endpoints of both tracks
|
||||
track1_start = track1.points.order(:timestamp).first
|
||||
track1_end = track1.points.order(:timestamp).last
|
||||
track2_start = track2.points.order(:timestamp).first
|
||||
track2_end = track2.points.order(:timestamp).last
|
||||
pts1 = track1.association(:points).loaded? ? track1.points : track1.points.to_a
|
||||
pts2 = track2.association(:points).loaded? ? track2.points : track2.points.to_a
|
||||
track1_start = pts1.min_by(&:timestamp)
|
||||
track1_end = pts1.max_by(&:timestamp)
|
||||
track2_start = pts2.min_by(&:timestamp)
|
||||
track2_end = pts2.max_by(&:timestamp)
|
||||
|
||||
# Check various connection scenarios
|
||||
connection_threshold = distance_threshold_meters
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ RSpec.describe Tracks::CreateJob, type: :job do
|
|||
|
||||
expect(ExceptionReporter).to have_received(:call).with(
|
||||
kind_of(StandardError),
|
||||
'Failed to create tracks for user'
|
||||
"Failed to create tracks for user #{user.id} (mode: daily, start_at: nil, end_at: nil)"
|
||||
)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -74,11 +74,11 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do
|
|||
expect(user_ids).not_to include(user3.id)
|
||||
end
|
||||
|
||||
it 'logs the process' do
|
||||
it 'logs the process with counts' do
|
||||
allow(Rails.logger).to receive(:info)
|
||||
|
||||
expect(Rails.logger).to receive(:info).with("Starting daily track generation for users with recent activity")
|
||||
expect(Rails.logger).to receive(:info).with("Completed daily track generation")
|
||||
expect(Rails.logger).to receive(:info).with('Starting daily track generation for users with recent activity')
|
||||
expect(Rails.logger).to receive(:info).with('Completed daily track generation: 2 users processed, 0 users failed')
|
||||
|
||||
job.perform
|
||||
end
|
||||
|
|
@ -94,11 +94,11 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do
|
|||
expect { job.perform }.not_to have_enqueued_job(Tracks::ParallelGeneratorJob)
|
||||
end
|
||||
|
||||
it 'still logs start and completion' do
|
||||
it 'still logs start and completion with zero counts' do
|
||||
allow(Rails.logger).to receive(:info)
|
||||
|
||||
expect(Rails.logger).to receive(:info).with("Starting daily track generation for users with recent activity")
|
||||
expect(Rails.logger).to receive(:info).with("Completed daily track generation")
|
||||
expect(Rails.logger).to receive(:info).with('Starting daily track generation for users with recent activity')
|
||||
expect(Rails.logger).to receive(:info).with('Completed daily track generation: 0 users processed, 0 users failed')
|
||||
|
||||
job.perform
|
||||
end
|
||||
|
|
@ -114,8 +114,11 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do
|
|||
end
|
||||
|
||||
it 'logs the error and continues processing' do
|
||||
allow(Rails.logger).to receive(:info)
|
||||
|
||||
expect(Rails.logger).to receive(:error).with("Failed to enqueue daily track generation for user #{user1.id}: Job failed")
|
||||
expect(ExceptionReporter).to receive(:call).with(instance_of(StandardError), "Daily track generation failed for user #{user1.id}")
|
||||
expect(Rails.logger).to receive(:info).with('Completed daily track generation: 0 users processed, 1 users failed')
|
||||
|
||||
expect { job.perform }.not_to raise_error
|
||||
end
|
||||
|
|
|
|||
Loading…
Reference in a new issue