From de07511820957d485ec7b4194f412a4d255718a4 Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Fri, 29 Aug 2025 18:01:36 +0200 Subject: [PATCH] Address come CR comments --- app/controllers/home_controller.rb | 4 -- app/jobs/tracks/boundary_resolver_job.rb | 2 + app/jobs/tracks/daily_generation_job.rb | 58 ++++++++++++---- app/jobs/tracks/time_chunk_processor_job.rb | 4 +- app/models/concerns/distanceable.rb | 69 ++++++++----------- app/models/point.rb | 4 +- app/services/tracks/boundary_detector.rb | 30 ++++++-- spec/jobs/tracks/create_job_spec.rb | 2 +- spec/jobs/tracks/daily_generation_job_spec.rb | 15 ++-- 9 files changed, 113 insertions(+), 75 deletions(-) diff --git a/app/controllers/home_controller.rb b/app/controllers/home_controller.rb index 27453c76..f19d4fb4 100644 --- a/app/controllers/home_controller.rb +++ b/app/controllers/home_controller.rb @@ -2,10 +2,6 @@ class HomeController < ApplicationController def index - # redirect_to 'https://dawarich.app', allow_other_host: true and return unless SELF_HOSTED - redirect_to map_url if current_user - - @points = current_user.points.without_raw_data if current_user end end diff --git a/app/jobs/tracks/boundary_resolver_job.rb b/app/jobs/tracks/boundary_resolver_job.rb index 92023649..4b6d6a9e 100644 --- a/app/jobs/tracks/boundary_resolver_job.rb +++ b/app/jobs/tracks/boundary_resolver_job.rb @@ -45,6 +45,8 @@ class Tracks::BoundaryResolverJob < ApplicationJob session_data = session_manager.get_session_data total_tracks = session_data['tracks_created'] + boundary_tracks_resolved + session_manager.update_session(tracks_created: total_tracks) + session_manager.mark_completed end diff --git a/app/jobs/tracks/daily_generation_job.rb b/app/jobs/tracks/daily_generation_job.rb index 84d0f2d8..dbfa2723 100644 --- a/app/jobs/tracks/daily_generation_job.rb +++ b/app/jobs/tracks/daily_generation_job.rb @@ -6,40 +6,68 @@ class Tracks::DailyGenerationJob < ApplicationJob queue_as :tracks def perform - Rails.logger.info "Starting daily track generation for users with recent activity" + # Compute time window once at job start to ensure consistency + time_window = compute_time_window - users_with_recent_activity.find_each do |user| - process_user_tracks(user) + Rails.logger.info 'Starting daily track generation for users with recent activity' + + users_processed = 0 + users_failed = 0 + + begin + users_with_recent_activity(time_window).find_each do |user| + if process_user_tracks(user, time_window) + users_processed += 1 + else + users_failed += 1 + end + end + rescue StandardError => e + Rails.logger.error "Critical failure in daily track generation: #{e.message}" + ExceptionReporter.call(e, 'Daily track generation job failed') + raise end - Rails.logger.info "Completed daily track generation" + Rails.logger.info "Completed daily track generation: #{users_processed} users processed, #{users_failed} users failed" end private - def users_with_recent_activity - # Users with points in last 2 days (buffer for cross-day tracks), via subquery + def compute_time_window + now = Time.current + { + activity_window_start: 2.days.ago(now), + activity_window_end: now, + processing_start: 3.days.ago(now).beginning_of_day, + processing_end: now + } + end + + def users_with_recent_activity(time_window) + # Find users who have created points within the activity window + # This gives buffer to handle cross-day tracks + user_ids = Point.where( + created_at: time_window[:activity_window_start]..time_window[:activity_window_end] + ).select(:user_id).distinct - user_ids = Point.where(created_at: 2.days.ago..Time.current).select(:user_id).distinct User.where(id: user_ids) end - def process_user_tracks(user) - # Process tracks for the last 2 days with buffer - start_at = 3.days.ago.beginning_of_day # Extra buffer for cross-day tracks - end_at = Time.current - + def process_user_tracks(user, time_window) Rails.logger.info "Enqueuing daily track generation for user #{user.id}" Tracks::ParallelGeneratorJob.perform_later( user.id, - start_at: start_at, - end_at: end_at, + start_at: time_window[:processing_start], + end_at: time_window[:processing_end], mode: :daily, - chunk_size: 6.hours # Smaller chunks for recent data processing + chunk_size: 6.hours ) + + true rescue StandardError => e Rails.logger.error "Failed to enqueue daily track generation for user #{user.id}: #{e.message}" ExceptionReporter.call(e, "Daily track generation failed for user #{user.id}") + false end end diff --git a/app/jobs/tracks/time_chunk_processor_job.rb b/app/jobs/tracks/time_chunk_processor_job.rb index 15952fca..47f0b589 100644 --- a/app/jobs/tracks/time_chunk_processor_job.rb +++ b/app/jobs/tracks/time_chunk_processor_job.rb @@ -11,7 +11,7 @@ class Tracks::TimeChunkProcessorJob < ApplicationJob def perform(user_id, session_id, chunk_data) @user = User.find(user_id) @session_manager = Tracks::SessionManager.new(user_id, session_id) - @chunk_data = chunk_data + @chunk_data = chunk_data.with_indifferent_access return unless session_exists? @@ -99,7 +99,7 @@ class Tracks::TimeChunkProcessorJob < ApplicationJob # Additional validation for the distance result if !distance.finite? || distance < 0 Rails.logger.error "Invalid distance calculated (#{distance}) for #{points.size} points in chunk #{chunk_data[:chunk_id]}" - Rails.logger.debug "Point coordinates: #{points.map { |p| [p.latitude, p.longitude] }.inspect}" + Rails.logger.debug "Point coordinates: #{points.map { |p| [p.lat, p.lon] }.inspect}" return nil end diff --git a/app/models/concerns/distanceable.rb b/app/models/concerns/distanceable.rb index 2bae9978..7e4b49be 100644 --- a/app/models/concerns/distanceable.rb +++ b/app/models/concerns/distanceable.rb @@ -21,56 +21,24 @@ module Distanceable return 0 if points.length < 2 total_meters = points.each_cons(2).sum do |p1, p2| - # Extract coordinates from lonlat (source of truth) + next 0 unless valid_point_pair?(p1, p2) + begin - # Check if lonlat exists and is valid - if p1.lonlat.nil? || p2.lonlat.nil? - next 0 - end - - lat1, lon1 = p1.lat, p1.lon - lat2, lon2 = p2.lat, p2.lon - - # Check for nil coordinates extracted from lonlat - if lat1.nil? || lon1.nil? || lat2.nil? || lon2.nil? - next 0 - end - - # Check for NaN or infinite coordinates - if [lat1, lon1, lat2, lon2].any? { |coord| !coord.finite? } - next 0 - end - - # Check for valid latitude/longitude ranges - if lat1.abs > 90 || lat2.abs > 90 || lon1.abs > 180 || lon2.abs > 180 - next 0 - end - distance_km = Geocoder::Calculations.distance_between( - [lat1, lon1], - [lat2, lon2], + [p1.lat, p1.lon], + [p2.lat, p2.lon], units: :km ) - - # Check if Geocoder returned NaN or infinite value - if !distance_km.finite? - next 0 - end - - distance_km * 1000 # Convert km to meters - rescue StandardError => e + rescue StandardError next 0 end + + # Check if Geocoder returned valid value + distance_km.finite? ? distance_km * 1000 : 0 # Convert km to meters end result = total_meters.to_f / ::DISTANCE_UNITS[unit.to_sym] - - # Final validation of result - if !result.finite? - return 0 - end - - result + result.finite? ? result : 0 end private @@ -230,6 +198,25 @@ module Distanceable private + def valid_point_pair?(p1, p2) + return false if p1.lonlat.nil? || p2.lonlat.nil? + + lat1, lon1 = p1.lat, p1.lon + lat2, lon2 = p2.lat, p2.lon + + # Check for nil coordinates + return false if lat1.nil? || lon1.nil? || lat2.nil? || lon2.nil? + + # Check for NaN or infinite coordinates + coords = [lat1, lon1, lat2, lon2] + return false if coords.any? { |coord| !coord.finite? } + + # Check for valid latitude/longitude ranges + return false if lat1.abs > 90 || lat2.abs > 90 || lon1.abs > 180 || lon2.abs > 180 + + true + end + def extract_point(point) case point when Array diff --git a/app/models/point.rb b/app/models/point.rb index 157c82bc..ce147c99 100644 --- a/app/models/point.rb +++ b/app/models/point.rb @@ -34,7 +34,9 @@ class Point < ApplicationRecord after_create :set_country after_create_commit :broadcast_coordinates after_create_commit :trigger_incremental_track_generation, if: -> { import_id.nil? } - after_commit :recalculate_track, on: :update, if: -> { track.present? } + after_commit :recalculate_track, + on: :update, + if: -> { track_id.present? && (saved_change_to_lonlat? || saved_change_to_timestamp? || saved_change_to_track_id?) } def self.without_raw_data select(column_names - ['raw_data']) diff --git a/app/services/tracks/boundary_detector.rb b/app/services/tracks/boundary_detector.rb index b6a7fa61..03aff2a6 100644 --- a/app/services/tracks/boundary_detector.rb +++ b/app/services/tracks/boundary_detector.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'set' + # Service to detect and resolve tracks that span across multiple time chunks # Handles merging partial tracks and cleaning up duplicates from parallel processing class Tracks::BoundaryDetector @@ -17,9 +19,25 @@ class Tracks::BoundaryDetector boundary_candidates = find_boundary_track_candidates return 0 if boundary_candidates.empty? + processed_ids = Set.new resolved_count = 0 + boundary_candidates.each do |group| - resolved_count += 1 if merge_boundary_tracks(group) + group_ids = group.map(&:id) + + # Skip if all tracks in this group have already been processed + next if group_ids.all? { |id| processed_ids.include?(id) } + + # Filter group to only include unprocessed tracks + unprocessed_group = group.reject { |track| processed_ids.include?(track.id) } + next if unprocessed_group.size < 2 + + # Attempt to merge the unprocessed tracks + if merge_boundary_tracks(unprocessed_group) + # Add all original member IDs to processed set + processed_ids.merge(group_ids) + resolved_count += 1 + end end resolved_count @@ -94,10 +112,12 @@ class Tracks::BoundaryDetector return false unless track1.points.exists? && track2.points.exists? # Get endpoints of both tracks - track1_start = track1.points.order(:timestamp).first - track1_end = track1.points.order(:timestamp).last - track2_start = track2.points.order(:timestamp).first - track2_end = track2.points.order(:timestamp).last + pts1 = track1.association(:points).loaded? ? track1.points : track1.points.to_a + pts2 = track2.association(:points).loaded? ? track2.points : track2.points.to_a + track1_start = pts1.min_by(&:timestamp) + track1_end = pts1.max_by(&:timestamp) + track2_start = pts2.min_by(&:timestamp) + track2_end = pts2.max_by(&:timestamp) # Check various connection scenarios connection_threshold = distance_threshold_meters diff --git a/spec/jobs/tracks/create_job_spec.rb b/spec/jobs/tracks/create_job_spec.rb index 693da3ec..d3467185 100644 --- a/spec/jobs/tracks/create_job_spec.rb +++ b/spec/jobs/tracks/create_job_spec.rb @@ -66,7 +66,7 @@ RSpec.describe Tracks::CreateJob, type: :job do expect(ExceptionReporter).to have_received(:call).with( kind_of(StandardError), - 'Failed to create tracks for user' + "Failed to create tracks for user #{user.id} (mode: daily, start_at: nil, end_at: nil)" ) end end diff --git a/spec/jobs/tracks/daily_generation_job_spec.rb b/spec/jobs/tracks/daily_generation_job_spec.rb index 2b0bb990..871fe0ce 100644 --- a/spec/jobs/tracks/daily_generation_job_spec.rb +++ b/spec/jobs/tracks/daily_generation_job_spec.rb @@ -74,11 +74,11 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do expect(user_ids).not_to include(user3.id) end - it 'logs the process' do + it 'logs the process with counts' do allow(Rails.logger).to receive(:info) - expect(Rails.logger).to receive(:info).with("Starting daily track generation for users with recent activity") - expect(Rails.logger).to receive(:info).with("Completed daily track generation") + expect(Rails.logger).to receive(:info).with('Starting daily track generation for users with recent activity') + expect(Rails.logger).to receive(:info).with('Completed daily track generation: 2 users processed, 0 users failed') job.perform end @@ -94,11 +94,11 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do expect { job.perform }.not_to have_enqueued_job(Tracks::ParallelGeneratorJob) end - it 'still logs start and completion' do + it 'still logs start and completion with zero counts' do allow(Rails.logger).to receive(:info) - expect(Rails.logger).to receive(:info).with("Starting daily track generation for users with recent activity") - expect(Rails.logger).to receive(:info).with("Completed daily track generation") + expect(Rails.logger).to receive(:info).with('Starting daily track generation for users with recent activity') + expect(Rails.logger).to receive(:info).with('Completed daily track generation: 0 users processed, 0 users failed') job.perform end @@ -114,8 +114,11 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do end it 'logs the error and continues processing' do + allow(Rails.logger).to receive(:info) + expect(Rails.logger).to receive(:error).with("Failed to enqueue daily track generation for user #{user1.id}: Job failed") expect(ExceptionReporter).to receive(:call).with(instance_of(StandardError), "Daily track generation failed for user #{user1.id}") + expect(Rails.logger).to receive(:info).with('Completed daily track generation: 0 users processed, 1 users failed') expect { job.perform }.not_to raise_error end