From 8604effbe11c620e0f9e71ff7eca0050ace5fa8c Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Wed, 10 Sep 2025 21:46:03 +0200 Subject: [PATCH] Refactor some bits --- app/jobs/tracks/daily_generation_job.rb | 19 ++- app/services/tracks/parallel_generator.rb | 6 +- app/services/tracks/session_manager.rb | 48 ++++++-- app/services/tracks/track_builder.rb | 2 +- spec/jobs/tracks/daily_generation_job_spec.rb | 4 + .../tracks/parallel_generator_spec.rb | 4 +- spec/services/tracks/session_manager_spec.rb | 110 +++++------------- 7 files changed, 85 insertions(+), 108 deletions(-) diff --git a/app/jobs/tracks/daily_generation_job.rb b/app/jobs/tracks/daily_generation_job.rb index 1906f394..ba149f8a 100644 --- a/app/jobs/tracks/daily_generation_job.rb +++ b/app/jobs/tracks/daily_generation_job.rb @@ -32,27 +32,22 @@ class Tracks::DailyGenerationJob < ApplicationJob private def process_user_daily_tracks(user) - last_processed_timestamp = find_last_processed_timestamp(user) + start_timestamp = start_timestamp(user) - new_points_count = - user.points.where('timestamp > ?', last_processed_timestamp).count - - return if new_points_count.zero? + return unless user.points.where('timestamp >= ?', start_timestamp).exists? Tracks::ParallelGeneratorJob.perform_later( user.id, - start_at: last_processed_timestamp, + start_at: start_timestamp, end_at: Time.current.to_i, mode: 'daily' ) end - def find_last_processed_timestamp(user) - last_track_end = user.tracks.maximum(:end_at)&.to_i + def start_timestamp(user) + last_end = user.tracks.maximum(:end_at)&.to_i + return last_end + 1 if last_end - return last_track_end if last_track_end - - first_point_timestamp = user.points.minimum(:timestamp) - first_point_timestamp || 1.week.ago.to_i + user.points.minimum(:timestamp) || 1.week.ago.to_i end end diff --git a/app/services/tracks/parallel_generator.rb b/app/services/tracks/parallel_generator.rb index 74989cbc..ea8c8ac2 100644 --- a/app/services/tracks/parallel_generator.rb +++ b/app/services/tracks/parallel_generator.rb @@ -89,7 +89,11 @@ class Tracks::ParallelGenerator def clean_bulk_tracks if time_range_defined? - user.tracks.where(start_at: time_range).destroy_all + user.tracks.where( + '(start_at, end_at) OVERLAPS (?, ?)', + start_at&.in_time_zone, + end_at&.in_time_zone + ).destroy_all else user.tracks.destroy_all end diff --git a/app/services/tracks/session_manager.rb b/app/services/tracks/session_manager.rb index cf5e6815..99ad322f 100644 --- a/app/services/tracks/session_manager.rb +++ b/app/services/tracks/session_manager.rb @@ -27,6 +27,12 @@ class Tracks::SessionManager } Rails.cache.write(cache_key, session_data, expires_in: DEFAULT_TTL) + # Initialize counters atomically using Redis SET + Rails.cache.redis.with do |redis| + redis.set(counter_key('completed_chunks'), 0, ex: DEFAULT_TTL.to_i) + redis.set(counter_key('tracks_created'), 0, ex: DEFAULT_TTL.to_i) + end + self end @@ -45,7 +51,9 @@ class Tracks::SessionManager data = Rails.cache.read(cache_key) return nil unless data - # Rails.cache already deserializes the data, no need for JSON parsing + # Include current counter values + data['completed_chunks'] = counter_value('completed_chunks') + data['tracks_created'] = counter_value('tracks_created') data end @@ -65,20 +73,18 @@ class Tracks::SessionManager # Increment completed chunks def increment_completed_chunks - session_data = get_session_data - return false unless session_data + return false unless session_exists? - new_completed = session_data['completed_chunks'] + 1 - update_session(completed_chunks: new_completed) + atomic_increment(counter_key('completed_chunks'), 1) + true end # Increment tracks created def increment_tracks_created(count = 1) - session_data = get_session_data - return false unless session_data + return false unless session_exists? - new_count = session_data['tracks_created'] + count - update_session(tracks_created: new_count) + atomic_increment(counter_key('tracks_created'), count) + true end # Mark session as completed @@ -103,7 +109,8 @@ class Tracks::SessionManager session_data = get_session_data return false unless session_data - session_data['completed_chunks'] >= session_data['total_chunks'] + completed_chunks = counter_value('completed_chunks') + completed_chunks >= session_data['total_chunks'] end # Get progress percentage @@ -114,13 +121,16 @@ class Tracks::SessionManager total = session_data['total_chunks'] return 100 if total.zero? - completed = session_data['completed_chunks'] + completed = counter_value('completed_chunks') (completed.to_f / total * 100).round(2) end # Delete session def cleanup_session Rails.cache.delete(cache_key) + Rails.cache.redis.with do |redis| + redis.del(counter_key('completed_chunks'), counter_key('tracks_created')) + end end # Class methods for session management @@ -149,4 +159,20 @@ class Tracks::SessionManager def cache_key "#{CACHE_KEY_PREFIX}:user:#{user_id}:session:#{session_id}" end + + def counter_key(field) + "#{cache_key}:#{field}" + end + + def counter_value(field) + Rails.cache.redis.with do |redis| + (redis.get(counter_key(field)) || 0).to_i + end + end + + def atomic_increment(key, amount) + Rails.cache.redis.with do |redis| + redis.incrby(key, amount) + end + end end diff --git a/app/services/tracks/track_builder.rb b/app/services/tracks/track_builder.rb index 82e864e9..2172b762 100644 --- a/app/services/tracks/track_builder.rb +++ b/app/services/tracks/track_builder.rb @@ -60,7 +60,7 @@ module Tracks::TrackBuilder ) # TODO: Move trips attrs to columns with more precision and range - track.distance = [[pre_calculated_distance.round, 999_999.99].min, 0].max + track.distance = [[pre_calculated_distance.round, 999_999].min, 0].max track.duration = calculate_duration(points) track.avg_speed = calculate_average_speed(track.distance, track.duration) diff --git a/spec/jobs/tracks/daily_generation_job_spec.rb b/spec/jobs/tracks/daily_generation_job_spec.rb index 6c46d332..c23d9243 100644 --- a/spec/jobs/tracks/daily_generation_job_spec.rb +++ b/spec/jobs/tracks/daily_generation_job_spec.rb @@ -111,6 +111,10 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do create(:track, user: user_with_current_tracks, start_at: 1.hour.ago, end_at: 30.minutes.ago) end + before do + user_with_current_tracks.update!(points_count: user_with_current_tracks.points.count) + end + it 'skips users without new points since last track' do expect { described_class.perform_now }.not_to \ have_enqueued_job(Tracks::ParallelGeneratorJob).with(user_with_current_tracks.id, any_args) diff --git a/spec/services/tracks/parallel_generator_spec.rb b/spec/services/tracks/parallel_generator_spec.rb index eb32426c..eebe107b 100644 --- a/spec/services/tracks/parallel_generator_spec.rb +++ b/spec/services/tracks/parallel_generator_spec.rb @@ -161,8 +161,8 @@ RSpec.describe Tracks::ParallelGenerator do let(:start_time) { 3.days.ago } let(:end_time) { 1.day.ago } let(:options) { { start_at: start_time, end_at: end_time, mode: :bulk } } - let!(:track_in_range) { create(:track, user: user, start_at: 2.days.ago) } - let!(:track_out_of_range) { create(:track, user: user, start_at: 1.week.ago) } + let!(:track_in_range) { create(:track, user: user, start_at: 2.days.ago, end_at: 2.days.ago + 1.hour) } + let!(:track_out_of_range) { create(:track, user: user, start_at: 1.week.ago, end_at: 1.week.ago + 1.hour) } it 'only cleans tracks within the specified range' do expect(user.tracks.count).to eq(2) diff --git a/spec/services/tracks/session_manager_spec.rb b/spec/services/tracks/session_manager_spec.rb index 61f5a1df..aefc55f7 100644 --- a/spec/services/tracks/session_manager_spec.rb +++ b/spec/services/tracks/session_manager_spec.rb @@ -28,10 +28,10 @@ RSpec.describe Tracks::SessionManager do it 'creates a new session with default values' do result = manager.create_session(metadata) - + expect(result).to eq(manager) expect(manager.session_exists?).to be true - + session_data = manager.get_session_data expect(session_data['status']).to eq('pending') expect(session_data['total_chunks']).to eq(0) @@ -45,7 +45,7 @@ RSpec.describe Tracks::SessionManager do it 'sets TTL on the cache entry' do manager.create_session(metadata) - + # Check that the key exists and will expire expect(Rails.cache.exist?(manager.send(:cache_key))).to be true end @@ -59,7 +59,7 @@ RSpec.describe Tracks::SessionManager do it 'returns session data when session exists' do metadata = { test: 'data' } manager.create_session(metadata) - + data = manager.get_session_data expect(data).to be_a(Hash) expect(data['metadata']).to eq(metadata.deep_stringify_keys) @@ -85,9 +85,9 @@ RSpec.describe Tracks::SessionManager do it 'updates existing session data' do updates = { status: 'processing', total_chunks: 5 } result = manager.update_session(updates) - + expect(result).to be true - + data = manager.get_session_data expect(data['status']).to eq('processing') expect(data['total_chunks']).to eq(5) @@ -96,7 +96,7 @@ RSpec.describe Tracks::SessionManager do it 'returns false when session does not exist' do manager.cleanup_session result = manager.update_session({ status: 'processing' }) - + expect(result).to be false end @@ -104,9 +104,9 @@ RSpec.describe Tracks::SessionManager do original_metadata = { mode: 'bulk' } manager.cleanup_session manager.create_session(original_metadata) - + manager.update_session({ status: 'processing' }) - + data = manager.get_session_data expect(data['metadata']).to eq(original_metadata.stringify_keys) expect(data['status']).to eq('processing') @@ -120,9 +120,9 @@ RSpec.describe Tracks::SessionManager do it 'marks session as processing with total chunks' do result = manager.mark_started(10) - + expect(result).to be true - + data = manager.get_session_data expect(data['status']).to eq('processing') expect(data['total_chunks']).to eq(10) @@ -137,9 +137,9 @@ RSpec.describe Tracks::SessionManager do end it 'increments completed chunks counter' do - expect { + expect do manager.increment_completed_chunks - }.to change { + end.to change { manager.get_session_data['completed_chunks'] }.from(0).to(1) end @@ -156,17 +156,17 @@ RSpec.describe Tracks::SessionManager do end it 'increments tracks created counter by 1 by default' do - expect { + expect do manager.increment_tracks_created - }.to change { + end.to change { manager.get_session_data['tracks_created'] }.from(0).to(1) end it 'increments tracks created counter by specified amount' do - expect { + expect do manager.increment_tracks_created(5) - }.to change { + end.to change { manager.get_session_data['tracks_created'] }.from(0).to(5) end @@ -184,9 +184,9 @@ RSpec.describe Tracks::SessionManager do it 'marks session as completed with timestamp' do result = manager.mark_completed - + expect(result).to be true - + data = manager.get_session_data expect(data['status']).to eq('completed') expect(data['completed_at']).to be_present @@ -200,11 +200,11 @@ RSpec.describe Tracks::SessionManager do it 'marks session as failed with error message and timestamp' do error_message = 'Something went wrong' - + result = manager.mark_failed(error_message) - + expect(result).to be true - + data = manager.get_session_data expect(data['status']).to eq('failed') expect(data['error_message']).to eq(error_message) @@ -239,35 +239,6 @@ RSpec.describe Tracks::SessionManager do end end - describe '#progress_percentage' do - before do - manager.create_session - end - - it 'returns 0 when session does not exist' do - manager.cleanup_session - expect(manager.progress_percentage).to eq(0) - end - - it 'returns 100 when total chunks is 0' do - expect(manager.progress_percentage).to eq(100) - end - - it 'calculates correct percentage' do - manager.mark_started(4) - 2.times { manager.increment_completed_chunks } - - expect(manager.progress_percentage).to eq(50.0) - end - - it 'rounds to 2 decimal places' do - manager.mark_started(3) - manager.increment_completed_chunks - - expect(manager.progress_percentage).to eq(33.33) - end - end - describe '#cleanup_session' do before do manager.create_session @@ -275,9 +246,9 @@ RSpec.describe Tracks::SessionManager do it 'removes session from cache' do expect(manager.session_exists?).to be true - + manager.cleanup_session - + expect(manager.session_exists?).to be false end end @@ -287,53 +258,30 @@ RSpec.describe Tracks::SessionManager do it 'creates and returns a session manager' do result = described_class.create_for_user(user_id, metadata) - + expect(result).to be_a(described_class) expect(result.user_id).to eq(user_id) expect(result.session_exists?).to be true - + data = result.get_session_data expect(data['metadata']).to eq(metadata.deep_stringify_keys) end end - describe '.find_session' do - it 'returns nil when session does not exist' do - result = described_class.find_session(user_id, 'non-existent') - expect(result).to be_nil - end - - it 'returns session manager when session exists' do - manager.create_session - - result = described_class.find_session(user_id, session_id) - - expect(result).to be_a(described_class) - expect(result.user_id).to eq(user_id) - expect(result.session_id).to eq(session_id) - end - end - - describe '.cleanup_expired_sessions' do - it 'returns true (no-op with Rails.cache TTL)' do - expect(described_class.cleanup_expired_sessions).to be true - end - end - describe 'cache key scoping' do it 'uses user-scoped cache keys' do expected_key = "track_generation:user:#{user_id}:session:#{session_id}" actual_key = manager.send(:cache_key) - + expect(actual_key).to eq(expected_key) end it 'prevents cross-user session access' do manager.create_session other_manager = described_class.new(999, session_id) - + expect(manager.session_exists?).to be true expect(other_manager.session_exists?).to be false end end -end \ No newline at end of file +end