diff --git a/app/jobs/tracks/daily_generation_job.rb b/app/jobs/tracks/daily_generation_job.rb index 2134987d..31db430a 100644 --- a/app/jobs/tracks/daily_generation_job.rb +++ b/app/jobs/tracks/daily_generation_job.rb @@ -57,11 +57,9 @@ class Tracks::DailyGenerationJob < ApplicationJob def find_last_processed_timestamp(user) last_track_end = user.tracks.maximum(:end_at)&.to_i - if last_track_end - last_track_end - else - first_point_timestamp = user.points.minimum(:timestamp) - first_point_timestamp || 1.week.ago.to_i - end + return last_track_end if last_track_end + + first_point_timestamp = user.points.minimum(:timestamp) + first_point_timestamp || 1.week.ago.to_i end end diff --git a/app/services/tracks/parallel_generator.rb b/app/services/tracks/parallel_generator.rb index 59c3f2c4..74989cbc 100644 --- a/app/services/tracks/parallel_generator.rb +++ b/app/services/tracks/parallel_generator.rb @@ -17,8 +17,7 @@ class Tracks::ParallelGenerator end def call - # Clean existing tracks if needed - clean_existing_tracks if should_clean_tracks? + clean_bulk_tracks if mode == :bulk # Generate time chunks time_chunks = generate_time_chunks @@ -40,13 +39,6 @@ class Tracks::ParallelGenerator private - def should_clean_tracks? - case mode - when :bulk, :daily then true - else false - end - end - def generate_time_chunks chunker = Tracks::TimeChunker.new( user, @@ -95,15 +87,6 @@ class Tracks::ParallelGenerator ) end - def clean_existing_tracks - case mode - when :bulk then clean_bulk_tracks - when :daily then clean_daily_tracks - else - raise ArgumentError, "Unknown mode: #{mode}" - end - end - def clean_bulk_tracks if time_range_defined? user.tracks.where(start_at: time_range).destroy_all @@ -112,17 +95,6 @@ class Tracks::ParallelGenerator end end - def clean_daily_tracks - # For daily mode, we don't want to clean all tracks for the day - # Instead, we clean tracks that overlap with the time range we're processing - # This allows for incremental processing without losing existing tracks - - return unless time_range_defined? - - # Only clean tracks that overlap with our processing time range - user.tracks.where(start_at: time_range).destroy_all - end - def time_range_defined? start_at.present? || end_at.present? end @@ -166,8 +138,8 @@ class Tracks::ParallelGenerator else # Convert seconds to readable format seconds = duration.to_i - if seconds >= 86400 # days - days = seconds / 86400 + if seconds >= 86_400 # days + days = seconds / 86_400 "#{days} day#{'s' if days != 1}" elsif seconds >= 3600 # hours hours = seconds / 3600 diff --git a/spec/jobs/tracks/daily_generation_job_spec.rb b/spec/jobs/tracks/daily_generation_job_spec.rb index 681d3f13..c4d632e9 100644 --- a/spec/jobs/tracks/daily_generation_job_spec.rb +++ b/spec/jobs/tracks/daily_generation_job_spec.rb @@ -26,7 +26,7 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do # Update points_count for users to reflect actual points active_user.update!(points_count: active_user.points.count) trial_user.update!(points_count: trial_user.points.count) - + ActiveJob::Base.queue_adapter.enqueued_jobs.clear end @@ -49,28 +49,62 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do end.not_to have_enqueued_job(Tracks::ParallelGeneratorJob) end - it 'enqueues parallel generation for users with new points' do + it 'enqueues correct number of parallel generation jobs for users with new points' do described_class.perform_now - # Check that jobs were enqueued with correct parameters enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| job[:job] == Tracks::ParallelGeneratorJob end expect(enqueued_jobs.count).to eq(2) + end + + it 'enqueues parallel generation job for active user with correct parameters' do + described_class.perform_now + + enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| + job[:job] == Tracks::ParallelGeneratorJob + end - # Check active user job active_user_job = enqueued_jobs.find { |job| job[:args].first == active_user.id } expect(active_user_job).to be_present - job_kwargs = active_user_job[:args].last - expect(job_kwargs['start_at']).to eq(active_user_old_track.end_at.to_i) # start_at - expect(job_kwargs['mode']).to eq('daily') # mode + end + + it 'uses correct start_at timestamp for active user' do + described_class.perform_now + + enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| + job[:job] == Tracks::ParallelGeneratorJob + end + + active_user_job = enqueued_jobs.find { |job| job[:args].first == active_user.id } + job_kwargs = active_user_job[:args].last + + expect(job_kwargs['start_at']).to eq(active_user_old_track.end_at.to_i) + end + + it 'uses daily mode for parallel generation jobs' do + described_class.perform_now + + enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| + job[:job] == Tracks::ParallelGeneratorJob + end + + active_user_job = enqueued_jobs.find { |job| job[:args].first == active_user.id } + job_kwargs = active_user_job[:args].last + + expect(job_kwargs['mode']).to eq('daily') + end + + it 'enqueues parallel generation job for trial user' do + described_class.perform_now + + enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| + job[:job] == Tracks::ParallelGeneratorJob + end - # Check trial user job trial_user_job = enqueued_jobs.find { |job| job[:args].first == trial_user.id } expect(trial_user_job).to be_present - trial_job_kwargs = trial_user_job[:args].last - expect(trial_job_kwargs['mode']).to eq('daily') # mode end it 'does not enqueue jobs for users without new points' do @@ -80,7 +114,7 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do have_enqueued_job(Tracks::ParallelGeneratorJob) end - it 'handles users with no existing tracks' do + it 'enqueues parallel generation job for users with no existing tracks' do # Create user with no tracks but with points spread over time user_no_tracks = create(:user, points_count: 5) # Create points with different timestamps so there are "new" points since the first one @@ -92,23 +126,48 @@ RSpec.describe Tracks::DailyGenerationJob, type: :job do enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| job[:job] == Tracks::ParallelGeneratorJob && job[:args].first == user_no_tracks.id end - - expect(enqueued_jobs.count).to eq(1) - # For users with no tracks, should start from first point timestamp + expect(enqueued_jobs.count).to eq(1) + end + + it 'uses first point timestamp as start_at for users with no tracks' do + # Create user with no tracks but with points spread over time + user_no_tracks = create(:user, points_count: 5) + # Create points with different timestamps so there are "new" points since the first one + create(:point, user: user_no_tracks, timestamp: 2.hours.ago.to_i) + create_list(:point, 4, user: user_no_tracks, timestamp: 1.hour.ago.to_i) + + described_class.perform_now + + enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs.select do |job| + job[:job] == Tracks::ParallelGeneratorJob && job[:args].first == user_no_tracks.id + end + + # For users with no tracks, should start from first point timestamp job_kwargs = enqueued_jobs.first[:args].last expect(job_kwargs['start_at']).to eq(user_no_tracks.points.minimum(:timestamp)) end - it 'handles exceptions gracefully' do + it 'does not raise errors when processing fails' do # Ensure users have points so they're not skipped active_user.update!(points_count: 5) trial_user.update!(points_count: 3) - + allow_any_instance_of(User).to receive(:tracks).and_raise(StandardError, 'Database error') allow(ExceptionReporter).to receive(:call) expect { described_class.perform_now }.not_to raise_error + end + + it 'reports exceptions when processing fails' do + # Ensure users have points so they're not skipped + active_user.update!(points_count: 5) + trial_user.update!(points_count: 3) + + allow_any_instance_of(User).to receive(:tracks).and_raise(StandardError, 'Database error') + allow(ExceptionReporter).to receive(:call) + + described_class.perform_now expect(ExceptionReporter).to have_received(:call).at_least(:once) end diff --git a/tracks.md b/tracks.md deleted file mode 100644 index c55356d6..00000000 --- a/tracks.md +++ /dev/null @@ -1,549 +0,0 @@ -# Parallel Track Generator - -## ✅ FEATURE COMPLETE - -The parallel track generator is a production-ready alternative to the existing track generation system. It processes location data in parallel time-based chunks using background jobs, providing better scalability and performance for large datasets. - -Status: ✅ READY FOR PRODUCTION - Core functionality implemented and fully tested. - -## Current State Analysis - -### Existing Implementation Issues - -- Heavy reliance on complex SQL operations in Track.get_segments_with_points (app/services/tracks/generator.rb:47) -- Uses PostgreSQL window functions, geography calculations, and array aggregations -- All processing happens in a single synchronous operation -- Memory intensive for large datasets -- No parallel processing capability - -### Dependencies Available - -- ✅ ActiveJob framework already in use -- ✅ Geocoder gem available for distance calculations -- ✅ Existing job patterns (see app/jobs/tracks/create_job.rb) -- ✅ User settings for time/distance thresholds - -## Architecture Overview - -### ✅ Implemented Directory Structure - -``` -app/ -├── jobs/ -│ └── tracks/ -│ ├── parallel_generator_job.rb ✅ Main coordinator -│ ├── time_chunk_processor_job.rb ✅ Process individual time chunks -│ ├── boundary_resolver_job.rb ✅ Merge cross-chunk tracks -│ └── daily_generation_job.rb ✅ Daily automatic track generation -├── services/ -│ └── tracks/ -│ ├── parallel_generator.rb ✅ Main service class -│ ├── time_chunker.rb ✅ Split time ranges into chunks -│ ├── segmentation.rb ✅ Ruby-based point segmentation (extended existing) -│ ├── boundary_detector.rb ✅ Handle cross-chunk boundaries -│ ├── session_manager.rb ✅ Rails.cache-based session tracking -│ └── session_cleanup.rb ❌ Not implemented (session cleanup handled in SessionManager) -└── models/concerns/ - └── distanceable.rb ✅ Extended with Geocoder calculations -``` - -### ✅ Implemented Key Components - -1. ✅ Parallel Generator: Main orchestrator service - coordinates the entire parallel process -2. ✅ Time Chunker: Splits date ranges into processable chunks with buffer zones (default: 1 day) -3. ✅ Rails.cache Session Manager: Tracks job progress and coordination (instead of Redis) -4. ✅ Enhanced Segmentation: Extended existing module with Geocoder-based calculations -5. ✅ Chunk Processor Jobs: Process individual time chunks in parallel using ActiveJob -6. ✅ Boundary Resolver: Handles tracks spanning multiple chunks with sophisticated merging logic -7. ❌ Session Cleanup: Not implemented as separate service (handled within SessionManager) -8. ✅ Daily Track Generation: Automatic processing of new points every 4 hours for active/trial users - -### ✅ Implemented Data Flow - -``` -User Request - ↓ -ParallelGeneratorJob ✅ - ↓ -Creates Rails.cache session entry ✅ - ↓ -TimeChunker splits date range with buffer zones ✅ - ↓ -Multiple TimeChunkProcessorJob (parallel) ✅ - ↓ -Each processes one time chunk using Geocoder ✅ - ↓ -BoundaryResolverJob (waits for all chunks) ✅ - ↓ -Merges cross-boundary tracks ✅ - ↓ -Rails.cache session marked as completed ✅ -``` - -## Implementation Plan - -### Phase 1: Foundation (High Priority) - -#### 1.1 Rails.cache-Based Session Tracking - -Files to create: - -- app/services/tracks/session_manager.rb ✅ IMPLEMENTED - -Session Schema (Rails.cache): - -```ruby -# Key pattern: "track_generation:user:#{user_id}:#{session_id}" -{ - status: "pending", # pending, processing, completed, failed - total_chunks: 0, - completed_chunks: 0, - tracks_created: 0, - started_at: "2024-01-01T10:00:00Z", - completed_at: nil, - error_message: nil, - metadata: { - mode: "bulk", - chunk_size: "1.day", - user_settings: {...} - } -} - -#### 1.2 Extend Distanceable Concern ✅ IMPLEMENTED - -File: app/models/concerns/distanceable.rb - -- ✅ Add Geocoder-based Ruby calculation methods -- ✅ Support pure Ruby distance calculations without SQL -- ✅ Maintain compatibility with existing PostGIS methods -#### 1.3 Time Chunker Service ✅ IMPLEMENTED - -File: app/services/tracks/time_chunker.rb - -- ✅ Split time ranges into configurable chunks (default: 1 day) -- ✅ Add buffer zones for boundary detection (6-hour overlap) -- ✅ Handle edge cases (empty ranges, single day) - -### Phase 2: Core Processing (High Priority) - -#### 2.1 Ruby Segmentation Service ✅ IMPLEMENTED - -File: app/services/tracks/segmentation.rb (extended existing) - -- ✅ Replace SQL window functions with Ruby logic - -- ✅ Stream points using find_each for memory efficiency - -- ✅ Use Geocoder for distance calculations - -- ✅ Implement gap detection (time and distance thresholds) - -- ✅ Return segments with pre-calculated distances - -#### 2.2 Parallel Generator Service ✅ IMPLEMENTED - -File: app/services/tracks/parallel_generator.rb - -- ✅ Main orchestrator for the entire process - -- ✅ Create generation sessions - -- ✅ Coordinate job enqueueing - -- ✅ Support all existing modes (bulk, incremental, daily) - -### Phase 3: Background Jobs (High Priority) - -#### 3.1 Parallel Generator Job ✅ IMPLEMENTED - -File: app/jobs/tracks/parallel_generator_job.rb - -- ✅ Entry point for background processing -- ✅ Handle user notifications - -#### 3.2 Time Chunk Processor Job ✅ IMPLEMENTED - -File: app/jobs/tracks/time_chunk_processor_job.rb - -- ✅ Process individual time chunks - -- ✅ Create tracks from segments - -- ✅ Update session progress - -- ✅ Handle chunk-level errors - -#### 3.3 Boundary Resolver Job ✅ IMPLEMENTED - -File: app/jobs/tracks/boundary_resolver_job.rb - -- ✅ Wait for all chunks to complete - -- ✅ Identify and merge cross-boundary tracks - -- ✅ Clean up duplicate/overlapping tracks - -- ✅ Finalize session - -### Phase 4: Enhanced Features (Medium Priority) - -#### 4.1 Boundary Detector Service ✅ IMPLEMENTED - -File: app/services/tracks/boundary_detector.rb - -- ✅ Detect tracks spanning multiple chunks - -- ✅ Merge partial tracks across boundaries - -- ✅ Avoid duplicate track creation - -- ✅ Handle complex multi-day journeys - -#### 4.2 Session Cleanup Service ❌ NOT IMPLEMENTED - -File: app/services/tracks/session_cleanup.rb - -- ❌ Handle stuck/failed sessions (handled in SessionManager) - -- ❌ Cleanup expired Rails.cache sessions (automatic TTL) - -- ❌ Background maintenance tasks (not needed with Rails.cache) - -### Phase 5: Integration & Testing (Medium Priority) - -#### 5.1 Controller Integration ✅ IMPLEMENTED - -- ✅ Update existing controllers to use parallel generator - -- ✅ Maintain backward compatibility - -- ✅ Simple status checking if needed - -#### 5.2 Error Handling & Retry Logic ✅ IMPLEMENTED - -- ✅ Implement exponential backoff for failed chunks - -- ✅ Add dead letter queue for permanent failures - -- ✅ Create rollback mechanisms - -- ✅ Comprehensive logging and monitoring - -#### 5.3 Performance Optimization ⏳ PARTIALLY COMPLETE - -- ⏳ Benchmark memory usage vs SQL approach (ready for testing) - -- ⏳ Test scalability with large datasets (ready for testing) - -- ⏳ Profile job queue performance (ready for testing) - -- ✅ Optimize Geocoder usage - -## ✅ IMPLEMENTATION STATUS - -### Foundation Tasks ✅ COMPLETE - -- [x] ✅ DONE Create Tracks::SessionManager service for Rails.cache-based tracking - -- [x] ✅ DONE Implement session creation, updates, and cleanup - -- [x] ✅ DONE Extend Distanceable concern with Geocoder integration - -- [x] ✅ DONE Implement Tracks::TimeChunker with buffer zones - -- [x] ✅ DONE Add Rails.cache TTL and cleanup strategies - -- [x] ✅ DONE Write comprehensive unit tests (35/35 SessionManager, 28/28 TimeChunker tests passing) - -### Core Processing Tasks ✅ COMPLETE - -- [x] ✅ DONE Extend Tracks::Segmentation with Geocoder-based methods - -- [x] ✅ DONE Replace SQL operations with Ruby streaming logic - -- [x] ✅ DONE Add point loading with batching support - -- [x] ✅ DONE Implement gap detection using time/distance thresholds - -- [x] ✅ DONE Create Tracks::ParallelGenerator orchestrator service - -- [x] ✅ DONE Support all existing modes (bulk, incremental, daily) - -- [x] ✅ DONE Write comprehensive unit tests (40/40 ParallelGenerator, 29/29 BoundaryDetector tests passing) - -### Background Job Tasks ✅ COMPLETE - -- [x] ✅ DONE Create Tracks::ParallelGeneratorJob entry point - -- [x] ✅ DONE Implement Tracks::TimeChunkProcessorJob for parallel processing - -- [x] ✅ DONE Add progress tracking and error handling - -- [x] ✅ DONE Create Tracks::BoundaryResolverJob for cross-chunk merging - -- [x] ✅ DONE Implement job coordination and dependency management - -- [x] ✅ DONE Add comprehensive logging and monitoring - -- [x] ✅ DONE Write integration tests for job workflows - -### Boundary Handling Tasks ✅ COMPLETE - -- [x] ✅ DONE Implement Tracks::BoundaryDetector service - -- [x] ✅ DONE Add cross-chunk track identification logic - -- [x] ✅ DONE Create sophisticated track merging algorithms - -- [x] ✅ DONE Handle duplicate track cleanup - -- [x] ✅ DONE Add validation for merged tracks - -- [x] ✅ DONE Test with complex multi-day scenarios - -### Integration Tasks ✅ COMPLETE - -- [x] ✅ DONE Job entry point maintains compatibility with existing patterns - -- [x] ✅ DONE Progress tracking via Rails.cache sessions - -- [x] ✅ DONE Error handling and user notifications - -- [x] ✅ DONE Multiple processing modes supported - -- [x] ✅ DONE User settings integration - -### Documentation Tasks ⏳ PARTIALLY COMPLETE - -- [x] ✅ DONE Updated implementation plan documentation - -- [⏳] PENDING Create deployment guides - -- [⏳] PENDING Document configuration options - -- [⏳] PENDING Add troubleshooting guides - -- [⏳] PENDING Update user documentation - -### Recently Added Features ✅ COMPLETE - -- [✅] Daily Track Generation: Automatic track creation from new points every 4 hours for active/trial users -- [✅] User model extensions: Methods for checking processing needs and finding last track timestamps -- [✅] Enhanced parallel generator: Improved daily mode support with incremental processing -- [✅] Scheduled job configuration: Added to config/schedule.yml for automatic execution -- [✅] Comprehensive test coverage: Full test suite for daily generation job - -### Missing Implementation Note - -- [❌] Session Cleanup Service: Not implemented as separate service. The SessionManager handles session lifecycle with Rails.cache automatic TTL expiration, making a dedicated cleanup service unnecessary. - -## Technical Considerations - -### Memory Management - -- Use streaming with find_each to avoid loading large datasets - -- Implement garbage collection hints for long-running jobs - -- Monitor memory usage in production - -### Job Queue Management - -- Implement rate limiting for job enqueueing - -- Use appropriate queue priorities - -- Monitor queue depth and processing times - -### Data Consistency - -- Ensure atomicity when updating track associations - -- Handle partial failures gracefully - -- Implement rollback mechanisms for failed sessions - -### Performance Optimization - -- Cache user settings to avoid repeated queries - -- Use bulk operations where possible - -- Optimize Geocoder usage patterns - -## Success Metrics - -### Performance Improvements - -- 50%+ reduction in database query complexity - -- Ability to process datasets in parallel - -- Improved memory usage patterns - -- Faster processing for large datasets - -### Operational Benefits - -- Better error isolation and recovery - -- Real-time progress tracking - -- Resumable operations - -- Improved monitoring and alerting - -### Scalability Gains - -- Horizontal scaling across multiple workers - -- Better resource utilization - -- Reduced database contention - -- Support for concurrent user processing - -## Risks and Mitigation - -### Technical Risks - -- Risk: Ruby processing might be slower than PostgreSQL - -- Mitigation: Benchmark and optimize, keep SQL fallback option - -- Risk: Job coordination complexity - -- Mitigation: Comprehensive testing, simple state machine - -- Risk: Memory usage in Ruby processing - -- Mitigation: Streaming processing, memory monitoring - -### Operational Risks - -- Risk: Job queue overload - -- Mitigation: Rate limiting, queue monitoring, auto-scaling - -- Risk: Data consistency issues - -- Mitigation: Atomic operations, comprehensive testing - -- Risk: Migration complexity - -- Mitigation: Feature flags, gradual rollout, rollback plan - ---- - -## ✅ IMPLEMENTATION SUMMARY - -### 🎉 SUCCESSFULLY COMPLETED - -The parallel track generator system has been fully implemented and is ready for production use! Here's what was accomplished: - -### 🚀 Key Features Delivered - -1. ✅ Time-based chunking with configurable buffer zones (6-hour default) - -2. ✅ Rails.cache session management (no Redis dependency required) - -3. ✅ Geocoder integration for all distance calculations - -4. ✅ Parallel background job processing using ActiveJob - -5. ✅ Cross-chunk boundary detection and merging - -6. ✅ Multiple processing modes (bulk, incremental, daily) - -7. ✅ Comprehensive logging and progress tracking - -8. ✅ User settings integration with caching - -9. ✅ Memory-efficient streaming processing - -10. ✅ Sophisticated error handling and recovery - -### 📁 Files Created/Modified - -#### New Services - -- app/services/tracks/session_manager.rb ✅ - -- app/services/tracks/time_chunker.rb ✅ - -- app/services/tracks/parallel_generator.rb ✅ - -- app/services/tracks/boundary_detector.rb ✅ - -- app/services/tracks/session_cleanup.rb ✅ - -#### New Jobs - -- app/jobs/tracks/parallel_generator_job.rb ✅ - -- app/jobs/tracks/time_chunk_processor_job.rb ✅ - -- app/jobs/tracks/boundary_resolver_job.rb ✅ - -#### Enhanced Existing - -- app/models/concerns/distanceable.rb ✅ (added Geocoder methods) - -- app/services/tracks/segmentation.rb ✅ (extended with Geocoder support) - -#### Comprehensive Test Suite - -- Complete test coverage for all core services - -- Integration tests for job workflows - -- Edge case handling and error scenarios - -### 🎯 Architecture Delivered - -The system successfully implements: - -- Horizontal scaling across multiple background workers - -- Time-based chunking instead of point-based (as requested) - -- Rails.cache coordination instead of database persistence - -- Buffer zone handling for cross-chunk track continuity - -- Geocoder-based calculations throughout the system - -- User settings integration with performance optimization - -### 🏁 Ready for Production - -The core functionality is complete and fully functional. All critical services have comprehensive test coverage with the following test counts: -- SessionManager: 35 tests -- TimeChunker: 28 tests -- ParallelGenerator: 40 tests -- BoundaryDetector: 29 tests - -The system can be deployed and used immediately to replace the existing track generator with significant improvements in: - -- Parallelization capabilities - -- Memory efficiency - -- Error isolation and recovery - -- Progress tracking - -- Scalability - -### 📋 Next Steps (Optional) - -1. Fix remaining test mock/spy setup issues - -2. Performance benchmarking against existing system - -3. Production deployment with feature flags - -4. Memory usage profiling and optimization - -5. Load testing with large datasets -