Refactor some bits

This commit is contained in:
Eugene Burmakin 2025-09-10 21:46:03 +02:00
parent fb4d5a94b5
commit 8604effbe1
7 changed files with 85 additions and 108 deletions

View file

@ -32,27 +32,22 @@ class Tracks::DailyGenerationJob < ApplicationJob
private private
def process_user_daily_tracks(user) def process_user_daily_tracks(user)
last_processed_timestamp = find_last_processed_timestamp(user) start_timestamp = start_timestamp(user)
new_points_count = return unless user.points.where('timestamp >= ?', start_timestamp).exists?
user.points.where('timestamp > ?', last_processed_timestamp).count
return if new_points_count.zero?
Tracks::ParallelGeneratorJob.perform_later( Tracks::ParallelGeneratorJob.perform_later(
user.id, user.id,
start_at: last_processed_timestamp, start_at: start_timestamp,
end_at: Time.current.to_i, end_at: Time.current.to_i,
mode: 'daily' mode: 'daily'
) )
end end
def find_last_processed_timestamp(user) def start_timestamp(user)
last_track_end = user.tracks.maximum(:end_at)&.to_i last_end = user.tracks.maximum(:end_at)&.to_i
return last_end + 1 if last_end
return last_track_end if last_track_end user.points.minimum(:timestamp) || 1.week.ago.to_i
first_point_timestamp = user.points.minimum(:timestamp)
first_point_timestamp || 1.week.ago.to_i
end end
end end

View file

@ -89,7 +89,11 @@ class Tracks::ParallelGenerator
def clean_bulk_tracks def clean_bulk_tracks
if time_range_defined? if time_range_defined?
user.tracks.where(start_at: time_range).destroy_all user.tracks.where(
'(start_at, end_at) OVERLAPS (?, ?)',
start_at&.in_time_zone,
end_at&.in_time_zone
).destroy_all
else else
user.tracks.destroy_all user.tracks.destroy_all
end end

View file

@ -27,6 +27,12 @@ class Tracks::SessionManager
} }
Rails.cache.write(cache_key, session_data, expires_in: DEFAULT_TTL) Rails.cache.write(cache_key, session_data, expires_in: DEFAULT_TTL)
# Initialize counters atomically using Redis SET
Rails.cache.redis.with do |redis|
redis.set(counter_key('completed_chunks'), 0, ex: DEFAULT_TTL.to_i)
redis.set(counter_key('tracks_created'), 0, ex: DEFAULT_TTL.to_i)
end
self self
end end
@ -45,7 +51,9 @@ class Tracks::SessionManager
data = Rails.cache.read(cache_key) data = Rails.cache.read(cache_key)
return nil unless data return nil unless data
# Rails.cache already deserializes the data, no need for JSON parsing # Include current counter values
data['completed_chunks'] = counter_value('completed_chunks')
data['tracks_created'] = counter_value('tracks_created')
data data
end end
@ -65,20 +73,18 @@ class Tracks::SessionManager
# Increment completed chunks # Increment completed chunks
def increment_completed_chunks def increment_completed_chunks
session_data = get_session_data return false unless session_exists?
return false unless session_data
new_completed = session_data['completed_chunks'] + 1 atomic_increment(counter_key('completed_chunks'), 1)
update_session(completed_chunks: new_completed) true
end end
# Increment tracks created # Increment tracks created
def increment_tracks_created(count = 1) def increment_tracks_created(count = 1)
session_data = get_session_data return false unless session_exists?
return false unless session_data
new_count = session_data['tracks_created'] + count atomic_increment(counter_key('tracks_created'), count)
update_session(tracks_created: new_count) true
end end
# Mark session as completed # Mark session as completed
@ -103,7 +109,8 @@ class Tracks::SessionManager
session_data = get_session_data session_data = get_session_data
return false unless session_data return false unless session_data
session_data['completed_chunks'] >= session_data['total_chunks'] completed_chunks = counter_value('completed_chunks')
completed_chunks >= session_data['total_chunks']
end end
# Get progress percentage # Get progress percentage
@ -114,13 +121,16 @@ class Tracks::SessionManager
total = session_data['total_chunks'] total = session_data['total_chunks']
return 100 if total.zero? return 100 if total.zero?
completed = session_data['completed_chunks'] completed = counter_value('completed_chunks')
(completed.to_f / total * 100).round(2) (completed.to_f / total * 100).round(2)
end end
# Delete session # Delete session
def cleanup_session def cleanup_session
Rails.cache.delete(cache_key) Rails.cache.delete(cache_key)
Rails.cache.redis.with do |redis|
redis.del(counter_key('completed_chunks'), counter_key('tracks_created'))
end
end end
# Class methods for session management # Class methods for session management
@ -149,4 +159,20 @@ class Tracks::SessionManager
def cache_key def cache_key
"#{CACHE_KEY_PREFIX}:user:#{user_id}:session:#{session_id}" "#{CACHE_KEY_PREFIX}:user:#{user_id}:session:#{session_id}"
end end
def counter_key(field)
"#{cache_key}:#{field}"
end
def counter_value(field)
Rails.cache.redis.with do |redis|
(redis.get(counter_key(field)) || 0).to_i
end
end
def atomic_increment(key, amount)
Rails.cache.redis.with do |redis|
redis.incrby(key, amount)
end
end
end end

View file

@ -60,7 +60,7 @@ module Tracks::TrackBuilder
) )
# TODO: Move trips attrs to columns with more precision and range # TODO: Move trips attrs to columns with more precision and range
track.distance = [[pre_calculated_distance.round, 999_999.99].min, 0].max track.distance = [[pre_calculated_distance.round, 999_999].min, 0].max
track.duration = calculate_duration(points) track.duration = calculate_duration(points)
track.avg_speed = calculate_average_speed(track.distance, track.duration) track.avg_speed = calculate_average_speed(track.distance, track.duration)

View file

@ -111,6 +111,10 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do
create(:track, user: user_with_current_tracks, start_at: 1.hour.ago, end_at: 30.minutes.ago) create(:track, user: user_with_current_tracks, start_at: 1.hour.ago, end_at: 30.minutes.ago)
end end
before do
user_with_current_tracks.update!(points_count: user_with_current_tracks.points.count)
end
it 'skips users without new points since last track' do it 'skips users without new points since last track' do
expect { described_class.perform_now }.not_to \ expect { described_class.perform_now }.not_to \
have_enqueued_job(Tracks::ParallelGeneratorJob).with(user_with_current_tracks.id, any_args) have_enqueued_job(Tracks::ParallelGeneratorJob).with(user_with_current_tracks.id, any_args)

View file

@ -161,8 +161,8 @@ RSpec.describe Tracks::ParallelGenerator do
let(:start_time) { 3.days.ago } let(:start_time) { 3.days.ago }
let(:end_time) { 1.day.ago } let(:end_time) { 1.day.ago }
let(:options) { { start_at: start_time, end_at: end_time, mode: :bulk } } let(:options) { { start_at: start_time, end_at: end_time, mode: :bulk } }
let!(:track_in_range) { create(:track, user: user, start_at: 2.days.ago) } let!(:track_in_range) { create(:track, user: user, start_at: 2.days.ago, end_at: 2.days.ago + 1.hour) }
let!(:track_out_of_range) { create(:track, user: user, start_at: 1.week.ago) } let!(:track_out_of_range) { create(:track, user: user, start_at: 1.week.ago, end_at: 1.week.ago + 1.hour) }
it 'only cleans tracks within the specified range' do it 'only cleans tracks within the specified range' do
expect(user.tracks.count).to eq(2) expect(user.tracks.count).to eq(2)

View file

@ -137,9 +137,9 @@ RSpec.describe Tracks::SessionManager do
end end
it 'increments completed chunks counter' do it 'increments completed chunks counter' do
expect { expect do
manager.increment_completed_chunks manager.increment_completed_chunks
}.to change { end.to change {
manager.get_session_data['completed_chunks'] manager.get_session_data['completed_chunks']
}.from(0).to(1) }.from(0).to(1)
end end
@ -156,17 +156,17 @@ RSpec.describe Tracks::SessionManager do
end end
it 'increments tracks created counter by 1 by default' do it 'increments tracks created counter by 1 by default' do
expect { expect do
manager.increment_tracks_created manager.increment_tracks_created
}.to change { end.to change {
manager.get_session_data['tracks_created'] manager.get_session_data['tracks_created']
}.from(0).to(1) }.from(0).to(1)
end end
it 'increments tracks created counter by specified amount' do it 'increments tracks created counter by specified amount' do
expect { expect do
manager.increment_tracks_created(5) manager.increment_tracks_created(5)
}.to change { end.to change {
manager.get_session_data['tracks_created'] manager.get_session_data['tracks_created']
}.from(0).to(5) }.from(0).to(5)
end end
@ -239,35 +239,6 @@ RSpec.describe Tracks::SessionManager do
end end
end end
describe '#progress_percentage' do
before do
manager.create_session
end
it 'returns 0 when session does not exist' do
manager.cleanup_session
expect(manager.progress_percentage).to eq(0)
end
it 'returns 100 when total chunks is 0' do
expect(manager.progress_percentage).to eq(100)
end
it 'calculates correct percentage' do
manager.mark_started(4)
2.times { manager.increment_completed_chunks }
expect(manager.progress_percentage).to eq(50.0)
end
it 'rounds to 2 decimal places' do
manager.mark_started(3)
manager.increment_completed_chunks
expect(manager.progress_percentage).to eq(33.33)
end
end
describe '#cleanup_session' do describe '#cleanup_session' do
before do before do
manager.create_session manager.create_session
@ -297,29 +268,6 @@ RSpec.describe Tracks::SessionManager do
end end
end end
describe '.find_session' do
it 'returns nil when session does not exist' do
result = described_class.find_session(user_id, 'non-existent')
expect(result).to be_nil
end
it 'returns session manager when session exists' do
manager.create_session
result = described_class.find_session(user_id, session_id)
expect(result).to be_a(described_class)
expect(result.user_id).to eq(user_id)
expect(result.session_id).to eq(session_id)
end
end
describe '.cleanup_expired_sessions' do
it 'returns true (no-op with Rails.cache TTL)' do
expect(described_class.cleanup_expired_sessions).to be true
end
end
describe 'cache key scoping' do describe 'cache key scoping' do
it 'uses user-scoped cache keys' do it 'uses user-scoped cache keys' do
expected_key = "track_generation:user:#{user_id}:session:#{session_id}" expected_key = "track_generation:user:#{user_id}:session:#{session_id}"