diff --git a/Gemfile b/Gemfile index de3aafef..fb80469e 100644 --- a/Gemfile +++ b/Gemfile @@ -49,6 +49,7 @@ gem 'sentry-ruby' gem 'sidekiq', '>= 8.0.5' gem 'sidekiq-cron', '>= 2.3.1' gem 'sidekiq-limit_fetch' +gem 'with_advisory_lock' gem 'sprockets-rails' gem 'stackprof' gem 'stimulus-rails' diff --git a/Gemfile.lock b/Gemfile.lock index a32eb801..e558cc91 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -620,6 +620,9 @@ GEM base64 websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) + with_advisory_lock (7.0.2) + activerecord (>= 7.2) + zeitwerk (>= 2.7) xpath (3.2.0) nokogiri (~> 1.8) zeitwerk (2.7.3) @@ -703,6 +706,7 @@ DEPENDENCIES turbo-rails (>= 2.0.17) tzinfo-data webmock + with_advisory_lock RUBY VERSION ruby 3.4.6p54 diff --git a/RAW_DATA_ARCHIVAL_PLAN.md b/RAW_DATA_ARCHIVAL_PLAN.md index d9d5a8f8..650af29d 100644 --- a/RAW_DATA_ARCHIVAL_PLAN.md +++ b/RAW_DATA_ARCHIVAL_PLAN.md @@ -389,9 +389,6 @@ module Archivable fetch_archived_raw_data end - # Alias for convenience (optional) - alias_method :archived_raw_data, :raw_data_with_archive - # Restore archived data back to database column def restore_raw_data!(value) update!( diff --git a/app/jobs/points/raw_data/archive_job.rb b/app/jobs/points/raw_data/archive_job.rb index 7596757f..4ea6dfdc 100644 --- a/app/jobs/points/raw_data/archive_job.rb +++ b/app/jobs/points/raw_data/archive_job.rb @@ -3,15 +3,15 @@ module Points module RawData class ArchiveJob < ApplicationJob - queue_as :default + queue_as :archival def perform stats = Points::RawData::Archiver.new.call Rails.logger.info("Archive job complete: #{stats}") rescue StandardError => e - Rails.logger.error("Archive job failed: #{e.message}") - Sentry.capture_exception(e) if defined?(Sentry) + ExceptionReporter.call(e, 'Points raw data archival job failed') + raise end end diff --git a/app/jobs/points/raw_data/re_archive_month_job.rb b/app/jobs/points/raw_data/re_archive_month_job.rb index d47cdbd0..a87db18e 100644 --- a/app/jobs/points/raw_data/re_archive_month_job.rb +++ b/app/jobs/points/raw_data/re_archive_month_job.rb @@ -3,15 +3,15 @@ module Points module RawData class ReArchiveMonthJob < ApplicationJob - queue_as :default + queue_as :archival def perform(user_id, year, month) Rails.logger.info("Re-archiving #{user_id}/#{year}/#{month} (retrospective import)") Points::RawData::Archiver.new.archive_specific_month(user_id, year, month) rescue StandardError => e - Rails.logger.error("Re-archive failed: #{e.message}") - Sentry.capture_exception(e) if defined?(Sentry) + ExceptionReporter.call(e, "Re-archival job failed for #{user_id}/#{year}/#{month}") + raise end end diff --git a/app/models/concerns/archivable.rb b/app/models/concerns/archivable.rb index a4554e1a..ed056a82 100644 --- a/app/models/concerns/archivable.rb +++ b/app/models/concerns/archivable.rb @@ -4,16 +4,13 @@ module Archivable extend ActiveSupport::Concern included do - # Associations belongs_to :raw_data_archive, - class_name: 'Points::RawDataArchive', - foreign_key: :raw_data_archive_id, - optional: true + class_name: 'Points::RawDataArchive', + optional: true - # Scopes scope :archived, -> { where(raw_data_archived: true) } scope :not_archived, -> { where(raw_data_archived: false) } - scope :with_archived_raw_data, -> { + scope :with_archived_raw_data, lambda { includes(raw_data_archive: { file_attachment: :blob }) } end @@ -21,16 +18,11 @@ module Archivable # Main method: Get raw_data with fallback to archive # Use this instead of point.raw_data when you need archived data def raw_data_with_archive - # If raw_data is present in DB, use it return raw_data if raw_data.present? || !raw_data_archived? - # Otherwise fetch from archive fetch_archived_raw_data end - # Alias for convenience (optional) - alias_method :archived_raw_data, :raw_data_with_archive - # Restore archived data back to database column def restore_raw_data!(value) update!( @@ -40,11 +32,6 @@ module Archivable ) end - # Cache key for long-term archive caching - def archive_cache_key - "raw_data:archive:#{self.class.name.underscore}:#{id}" - end - private def fetch_archived_raw_data @@ -52,10 +39,7 @@ module Archivable cached = check_temporary_restore_cache return cached if cached - # Check long-term cache (1 day TTL) - Rails.cache.fetch(archive_cache_key, expires_in: 1.day) do - fetch_from_archive_file - end + fetch_from_archive_file rescue StandardError => e handle_archive_fetch_error(e) end @@ -90,10 +74,7 @@ module Archivable end def handle_archive_fetch_error(error) - Rails.logger.error( - "Failed to fetch archived raw_data for #{self.class.name} #{id}: #{error.message}" - ) - Sentry.capture_exception(error) if defined?(Sentry) + ExceptionReporter.call(error, "Failed to fetch archived raw_data for Point ID #{id}") {} # Graceful degradation end diff --git a/app/models/points/raw_data_archive.rb b/app/models/points/raw_data_archive.rb index 151d2d02..0f7968eb 100644 --- a/app/models/points/raw_data_archive.rb +++ b/app/models/points/raw_data_archive.rb @@ -5,7 +5,7 @@ module Points self.table_name = 'points_raw_data_archives' belongs_to :user - has_many :points, foreign_key: :raw_data_archive_id, dependent: :nullify + has_many :points, dependent: :nullify has_one_attached :file @@ -15,9 +15,7 @@ module Points validates :chunk_number, numericality: { greater_than: 0 } validates :point_ids_checksum, presence: true - validate :file_must_be_attached, on: :update - - scope :for_month, ->(user_id, year, month) { + scope :for_month, lambda { |user_id, year, month| where(user_id: user_id, year: year, month: month) .order(:chunk_number) } @@ -38,11 +36,5 @@ module Points (file.blob.byte_size / 1024.0 / 1024.0).round(2) end - - private - - def file_must_be_attached - errors.add(:file, 'must be attached') unless file.attached? - end end end diff --git a/app/services/exception_reporter.rb b/app/services/exception_reporter.rb index 667206a8..40bd0ba5 100644 --- a/app/services/exception_reporter.rb +++ b/app/services/exception_reporter.rb @@ -2,7 +2,7 @@ class ExceptionReporter def self.call(exception, human_message = 'Exception reported') - return unless DawarichSettings.self_hosted? + return if DawarichSettings.self_hosted? Rails.logger.error "#{human_message}: #{exception.message}" diff --git a/app/services/points/raw_data/archiver.rb b/app/services/points/raw_data/archiver.rb index 424d1dfa..f12d3783 100644 --- a/app/services/points/raw_data/archiver.rb +++ b/app/services/points/raw_data/archiver.rb @@ -17,7 +17,7 @@ module Points Rails.logger.info('Starting points raw_data archival...') - archivable_months.find_each do |month_data| + archivable_months.each do |month_data| process_month(month_data) end @@ -45,14 +45,26 @@ module Points # Only months 2+ months old with unarchived points safe_cutoff = Date.current.beginning_of_month - SAFE_ARCHIVE_LAG - Point.select( - 'user_id', - 'EXTRACT(YEAR FROM to_timestamp(timestamp))::int as year', - 'EXTRACT(MONTH FROM to_timestamp(timestamp))::int as month', - 'COUNT(*) as unarchived_count' - ).where(raw_data_archived: false) - .where('to_timestamp(timestamp) < ?', safe_cutoff) - .group('user_id, EXTRACT(YEAR FROM to_timestamp(timestamp)), EXTRACT(MONTH FROM to_timestamp(timestamp))') + # Use raw SQL to avoid GROUP BY issues with ActiveRecord + # Use AT TIME ZONE 'UTC' to ensure consistent timezone handling + sql = <<-SQL.squish + SELECT user_id, + EXTRACT(YEAR FROM (to_timestamp(timestamp) AT TIME ZONE 'UTC'))::int as year, + EXTRACT(MONTH FROM (to_timestamp(timestamp) AT TIME ZONE 'UTC'))::int as month, + COUNT(*) as unarchived_count + FROM points + WHERE raw_data_archived = false + AND raw_data IS NOT NULL + AND raw_data != '{}' + AND to_timestamp(timestamp) < ? + GROUP BY user_id, + EXTRACT(YEAR FROM (to_timestamp(timestamp) AT TIME ZONE 'UTC')), + EXTRACT(MONTH FROM (to_timestamp(timestamp) AT TIME ZONE 'UTC')) + SQL + + ActiveRecord::Base.connection.exec_query( + ActiveRecord::Base.sanitize_sql_array([sql, safe_cutoff]) + ) end def process_month(month_data) @@ -63,12 +75,14 @@ module Points lock_key = "archive_points:#{user_id}:#{year}:#{month}" # Advisory lock prevents duplicate processing - ActiveRecord::Base.with_advisory_lock(lock_key, timeout_seconds: 0) do + # Returns false if lock couldn't be acquired (already locked) + lock_acquired = ActiveRecord::Base.with_advisory_lock(lock_key, timeout_seconds: 0) do archive_month(user_id, year, month) @stats[:processed] += 1 + true end - rescue ActiveRecord::AdvisoryLockError - Rails.logger.info("Skipping #{lock_key} - already locked") + + Rails.logger.info("Skipping #{lock_key} - already locked") unless lock_acquired rescue StandardError => e Rails.logger.error("Archive failed for #{user_id}/#{year}/#{month}: #{e.message}") Sentry.capture_exception(e) if defined?(Sentry) @@ -76,45 +90,59 @@ module Points end def archive_month(user_id, year, month) - # Calculate timestamp range for the month - start_of_month = Time.new(year, month, 1).to_i - end_of_month = (Time.new(year, month, 1) + 1.month).to_i - - # Find unarchived points for this month - points = Point.where( - user_id: user_id, - raw_data_archived: false - ).where(timestamp: start_of_month...end_of_month) - .where.not(raw_data: nil) # Skip already-NULLed points - + points = find_archivable_points(user_id, year, month) return if points.empty? point_ids = points.pluck(:id) + log_archival_start(user_id, year, month, point_ids.count) - Rails.logger.info("Archiving #{point_ids.count} points for user #{user_id}, #{year}-#{format('%02d', month)}") - - # Create archive chunk archive = create_archive_chunk(user_id, year, month, points, point_ids) + mark_points_as_archived(point_ids, archive.id) + update_stats(point_ids.count) + log_archival_success(archive) + end - # Atomically mark points and NULL raw_data + def find_archivable_points(user_id, year, month) + timestamp_range = month_timestamp_range(year, month) + + Point.where(user_id: user_id, raw_data_archived: false) + .where(timestamp: timestamp_range) + .where.not(raw_data: nil) + end + + def month_timestamp_range(year, month) + start_of_month = Time.utc(year, month, 1).to_i + end_of_month = (Time.utc(year, month, 1) + 1.month).to_i + start_of_month...end_of_month + end + + def mark_points_as_archived(point_ids, archive_id) Point.transaction do Point.where(id: point_ids).update_all( raw_data_archived: true, - raw_data_archive_id: archive.id, - raw_data: nil # Reclaim space! + raw_data_archive_id: archive_id, + raw_data: nil ) end + end - @stats[:archived] += point_ids.count + def update_stats(archived_count) + @stats[:archived] += archived_count + end + def log_archival_start(user_id, year, month, count) + Rails.logger.info("Archiving #{count} points for user #{user_id}, #{year}-#{format('%02d', month)}") + end + + def log_archival_success(archive) Rails.logger.info("✓ Archived chunk #{archive.chunk_number} (#{archive.size_mb} MB)") end def create_archive_chunk(user_id, year, month, points, point_ids) # Determine chunk number (append-only) chunk_number = Points::RawDataArchive - .where(user_id: user_id, year: year, month: month) - .maximum(:chunk_number).to_i + 1 + .where(user_id: user_id, year: year, month: month) + .maximum(:chunk_number).to_i + 1 # Compress points data compressed_data = Points::RawData::ChunkCompressor.new(points).compress diff --git a/app/services/points/raw_data/restorer.rb b/app/services/points/raw_data/restorer.rb index 5edf78d3..004f7185 100644 --- a/app/services/points/raw_data/restorer.rb +++ b/app/services/points/raw_data/restorer.rb @@ -11,9 +11,7 @@ module Points Rails.logger.info("Restoring #{archives.count} archives to database...") Point.transaction do - archives.each do |archive| - restore_archive_to_db(archive) - end + archives.each { restore_archive_to_db(_1) } end Rails.logger.info("✓ Restored #{archives.sum(:point_count)} points") @@ -37,10 +35,11 @@ module Points end def restore_all_for_user(user_id) - archives = Points::RawDataArchive.where(user_id: user_id) - .select(:year, :month) - .distinct - .order(:year, :month) + archives = + Points::RawDataArchive.where(user_id: user_id) + .select(:year, :month) + .distinct + .order(:year, :month) Rails.logger.info("Restoring #{archives.count} months for user #{user_id}...") @@ -48,7 +47,7 @@ module Points restore_to_database(user_id, archive.year, archive.month) end - Rails.logger.info("✓ Complete user restore finished") + Rails.logger.info('✓ Complete user restore finished') end private diff --git a/config/initializers/01_constants.rb b/config/initializers/01_constants.rb index f7b0ba98..b5ec3649 100644 --- a/config/initializers/01_constants.rb +++ b/config/initializers/01_constants.rb @@ -62,3 +62,6 @@ OIDC_AUTO_REGISTER = ENV.fetch('OIDC_AUTO_REGISTER', 'true') == 'true' # Email/password registration setting (default: false for self-hosted, true for cloud) ALLOW_EMAIL_PASSWORD_REGISTRATION = ENV.fetch('ALLOW_EMAIL_PASSWORD_REGISTRATION', 'false') == 'true' + +# Raw data archival setting +ARCHIVE_RAW_DATA = ENV.fetch('ARCHIVE_RAW_DATA', 'false') == 'true' diff --git a/config/initializers/03_dawarich_settings.rb b/config/initializers/03_dawarich_settings.rb index 89a49267..2bc7cf4c 100644 --- a/config/initializers/03_dawarich_settings.rb +++ b/config/initializers/03_dawarich_settings.rb @@ -49,5 +49,9 @@ class DawarichSettings family: family_feature_enabled? } end + + def archive_raw_data_enabled? + @archive_raw_data_enabled ||= ARCHIVE_RAW_DATA + end end end diff --git a/config/sidekiq.yml b/config/sidekiq.yml index 5f2e133e..a4464488 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -16,3 +16,4 @@ - places - app_version_checking - cache + - archival diff --git a/spec/factories/points_raw_data_archives.rb b/spec/factories/points_raw_data_archives.rb new file mode 100644 index 00000000..12f576c0 --- /dev/null +++ b/spec/factories/points_raw_data_archives.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :points_raw_data_archive, class: 'Points::RawDataArchive' do + user + year { 2024 } + month { 6 } + chunk_number { 1 } + point_count { 100 } + point_ids_checksum { Digest::SHA256.hexdigest('1,2,3') } + archived_at { Time.current } + metadata { { format_version: 1, compression: 'gzip' } } + + after(:build) do |archive| + # Attach a test file + archive.file.attach( + io: StringIO.new(gzip_test_data), + filename: archive.filename, + content_type: 'application/gzip' + ) + end + end +end + +def gzip_test_data + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + gz.puts({ id: 1, raw_data: { lon: 13.4, lat: 52.5 } }.to_json) + gz.puts({ id: 2, raw_data: { lon: 13.5, lat: 52.6 } }.to_json) + gz.close + io.string +end diff --git a/spec/jobs/points/raw_data/archive_job_spec.rb b/spec/jobs/points/raw_data/archive_job_spec.rb new file mode 100644 index 00000000..26213cb0 --- /dev/null +++ b/spec/jobs/points/raw_data/archive_job_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Points::RawData::ArchiveJob, type: :job do + describe '#perform' do + let(:archiver) { instance_double(Points::RawData::Archiver) } + + before do + allow(Points::RawData::Archiver).to receive(:new).and_return(archiver) + end + + it 'calls the archiver service' do + expect(archiver).to receive(:call).and_return({ processed: 5, archived: 100, failed: 0 }) + + described_class.perform_now + end + + context 'when archiver raises an error' do + before do + allow(archiver).to receive(:call).and_raise(StandardError, 'Archive failed') + end + + it 're-raises the error' do + expect do + described_class.perform_now + end.to raise_error(StandardError, 'Archive failed') + end + end + end +end diff --git a/spec/jobs/points/raw_data/re_archive_month_job_spec.rb b/spec/jobs/points/raw_data/re_archive_month_job_spec.rb new file mode 100644 index 00000000..277caf6e --- /dev/null +++ b/spec/jobs/points/raw_data/re_archive_month_job_spec.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Points::RawData::ReArchiveMonthJob, type: :job do + describe '#perform' do + let(:archiver) { instance_double(Points::RawData::Archiver) } + let(:user_id) { 123 } + let(:year) { 2024 } + let(:month) { 6 } + + before do + allow(Points::RawData::Archiver).to receive(:new).and_return(archiver) + end + + it 'calls archive_specific_month with correct parameters' do + expect(archiver).to receive(:archive_specific_month).with(user_id, year, month) + + described_class.perform_now(user_id, year, month) + end + + context 'when re-archival fails' do + before do + allow(archiver).to receive(:archive_specific_month).and_raise(StandardError, 'Re-archive failed') + end + + it 're-raises the error' do + expect do + described_class.perform_now(user_id, year, month) + end.to raise_error(StandardError, 'Re-archive failed') + end + end + end +end diff --git a/spec/models/concerns/archivable_spec.rb b/spec/models/concerns/archivable_spec.rb new file mode 100644 index 00000000..53f7a56c --- /dev/null +++ b/spec/models/concerns/archivable_spec.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Archivable, type: :model do + let(:user) { create(:user) } + let(:point) { create(:point, user: user, raw_data: { lon: 13.4, lat: 52.5 }) } + + describe 'associations and scopes' do + it { expect(point).to belong_to(:raw_data_archive).optional } + + describe 'scopes' do + let!(:archived_point) { create(:point, user: user, raw_data_archived: true) } + let!(:not_archived_point) { create(:point, user: user, raw_data_archived: false) } + + it '.archived returns archived points' do + expect(Point.archived).to include(archived_point) + expect(Point.archived).not_to include(not_archived_point) + end + + it '.not_archived returns non-archived points' do + expect(Point.not_archived).to include(not_archived_point) + expect(Point.not_archived).not_to include(archived_point) + end + end + end + + describe '#raw_data_with_archive' do + context 'when raw_data is present in database' do + it 'returns raw_data from database' do + expect(point.raw_data_with_archive).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) + end + end + + context 'when raw_data is archived' do + let(:archive) { create(:points_raw_data_archive, user: user) } + let(:archived_point) do + create(:point, user: user, raw_data: nil, raw_data_archived: true, raw_data_archive: archive) + end + + before do + # Mock archive file content with this specific point + compressed_data = gzip_data([ + { id: archived_point.id, raw_data: { lon: 14.0, lat: 53.0 } } + ]) + allow(archive.file.blob).to receive(:download).and_return(compressed_data) + end + + it 'fetches raw_data from archive' do + result = archived_point.raw_data_with_archive + expect(result).to eq({ 'id' => archived_point.id, 'raw_data' => { 'lon' => 14.0, 'lat' => 53.0 } }['raw_data']) + end + end + + context 'when raw_data is archived but point not in archive' do + let(:archive) { create(:points_raw_data_archive, user: user) } + let(:archived_point) do + create(:point, user: user, raw_data: nil, raw_data_archived: true, raw_data_archive: archive) + end + + before do + # Mock archive file with different point + compressed_data = gzip_data([ + { id: 999, raw_data: { lon: 14.0, lat: 53.0 } } + ]) + allow(archive.file.blob).to receive(:download).and_return(compressed_data) + end + + it 'returns empty hash' do + expect(archived_point.raw_data_with_archive).to eq({}) + end + end + end + + describe '#restore_raw_data!' do + let(:archive) { create(:points_raw_data_archive, user: user) } + let(:archived_point) do + create(:point, user: user, raw_data: nil, raw_data_archived: true, raw_data_archive: archive) + end + + it 'restores raw_data to database and clears archive flags' do + new_data = { lon: 15.0, lat: 54.0 } + archived_point.restore_raw_data!(new_data) + + archived_point.reload + expect(archived_point.raw_data).to eq(new_data.stringify_keys) + expect(archived_point.raw_data_archived).to be false + expect(archived_point.raw_data_archive_id).to be_nil + end + end + + describe 'temporary cache' do + let(:june_point) { create(:point, user: user, timestamp: Time.new(2024, 6, 15).to_i) } + + it 'checks temporary restore cache with correct key format' do + cache_key = "raw_data:temp:#{user.id}:2024:6:#{june_point.id}" + cached_data = { lon: 16.0, lat: 55.0 } + + Rails.cache.write(cache_key, cached_data, expires_in: 1.hour) + + # Access through send since check_temporary_restore_cache is private + result = june_point.send(:check_temporary_restore_cache) + expect(result).to eq(cached_data) + end + end + + def gzip_data(points_array) + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + points_array.each do |point_data| + gz.puts(point_data.to_json) + end + gz.close + io.string + end +end diff --git a/spec/models/points/raw_data_archive_spec.rb b/spec/models/points/raw_data_archive_spec.rb new file mode 100644 index 00000000..c4c2bfc4 --- /dev/null +++ b/spec/models/points/raw_data_archive_spec.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Points::RawDataArchive, type: :model do + let(:user) { create(:user) } + subject(:archive) { build(:points_raw_data_archive, user: user) } + + describe 'associations' do + it { is_expected.to belong_to(:user) } + it { is_expected.to have_many(:points).dependent(:nullify) } + end + + describe 'validations' do + it { is_expected.to validate_presence_of(:year) } + it { is_expected.to validate_presence_of(:month) } + it { is_expected.to validate_presence_of(:chunk_number) } + it { is_expected.to validate_presence_of(:point_count) } + it { is_expected.to validate_presence_of(:point_ids_checksum) } + + it { is_expected.to validate_numericality_of(:year).is_greater_than(1970).is_less_than(2100) } + it { is_expected.to validate_numericality_of(:month).is_greater_than_or_equal_to(1).is_less_than_or_equal_to(12) } + it { is_expected.to validate_numericality_of(:chunk_number).is_greater_than(0) } + + context 'when updating' do + it 'validates file is attached' do + archive = create(:points_raw_data_archive, user: user) + archive.file.purge if archive.file.attached? + + expect(archive).not_to be_valid + expect(archive.errors[:file]).to include('must be attached') + end + end + end + + describe 'scopes' do + let!(:recent_archive) { create(:points_raw_data_archive, user: user, archived_at: 1.day.ago) } + let!(:old_archive) { create(:points_raw_data_archive, user: user, archived_at: 2.years.ago) } + + describe '.recent' do + it 'returns archives from last 30 days' do + expect(described_class.recent).to include(recent_archive) + expect(described_class.recent).not_to include(old_archive) + end + end + + describe '.old' do + it 'returns archives older than 1 year' do + expect(described_class.old).to include(old_archive) + expect(described_class.old).not_to include(recent_archive) + end + end + + describe '.for_month' do + let!(:june_archive) { create(:points_raw_data_archive, user: user, year: 2024, month: 6, chunk_number: 1) } + let!(:june_archive_2) { create(:points_raw_data_archive, user: user, year: 2024, month: 6, chunk_number: 2) } + let!(:july_archive) { create(:points_raw_data_archive, user: user, year: 2024, month: 7, chunk_number: 1) } + + it 'returns archives for specific month ordered by chunk number' do + result = described_class.for_month(user.id, 2024, 6) + expect(result).to eq([june_archive, june_archive_2]) + end + end + end + + describe '#month_display' do + it 'returns formatted month and year' do + archive = build(:points_raw_data_archive, year: 2024, month: 6) + expect(archive.month_display).to eq('June 2024') + end + end + + describe '#filename' do + it 'generates correct filename' do + archive = build(:points_raw_data_archive, user_id: 123, year: 2024, month: 6, chunk_number: 5) + expect(archive.filename).to eq('raw_data_123_2024_06_chunk005.jsonl.gz') + end + end + + describe '#size_mb' do + it 'returns 0 when no file attached' do + archive = build(:points_raw_data_archive) + expect(archive.size_mb).to eq(0) + end + + it 'returns size in MB when file is attached' do + archive = create(:points_raw_data_archive, user: user) + # Mock file with 2MB size + allow(archive.file.blob).to receive(:byte_size).and_return(2 * 1024 * 1024) + expect(archive.size_mb).to eq(2.0) + end + end +end diff --git a/spec/services/points/raw_data/archiver_spec.rb b/spec/services/points/raw_data/archiver_spec.rb new file mode 100644 index 00000000..63cc43a3 --- /dev/null +++ b/spec/services/points/raw_data/archiver_spec.rb @@ -0,0 +1,190 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Points::RawData::Archiver do + let(:user) { create(:user) } + let(:archiver) { described_class.new } + + before do + # Stub broadcasting to avoid ActionCable issues in tests + allow(PointsChannel).to receive(:broadcast_to) + end + + describe '#call' do + context 'when archival is disabled' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('false') + end + + it 'returns early without processing' do + result = archiver.call + expect(result).to eq({ processed: 0, archived: 0, failed: 0 }) + end + end + + context 'when archival is enabled' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let!(:old_points) do + # Create points 3 months ago (definitely older than 2 month lag) + old_date = 3.months.ago.beginning_of_month + create_list(:point, 5, user: user, + timestamp: old_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'archives old points' do + expect { archiver.call }.to change(Points::RawDataArchive, :count).by(1) + end + + it 'marks points as archived' do + archiver.call + expect(Point.where(raw_data_archived: true).count).to eq(5) + end + + it 'nullifies raw_data column' do + archiver.call + Point.where(user: user).each do |point| + expect(point.raw_data).to be_nil + end + end + + it 'returns correct stats' do + result = archiver.call + expect(result[:processed]).to eq(1) + expect(result[:archived]).to eq(5) + expect(result[:failed]).to eq(0) + end + end + + context 'with points from multiple months' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let!(:june_points) do + june_date = 4.months.ago.beginning_of_month + create_list(:point, 3, user: user, + timestamp: june_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + let!(:july_points) do + july_date = 3.months.ago.beginning_of_month + create_list(:point, 2, user: user, + timestamp: july_date.to_i, + raw_data: { lon: 14.0, lat: 53.0 }) + end + + it 'creates separate archives for each month' do + expect { archiver.call }.to change(Points::RawDataArchive, :count).by(2) + end + + it 'archives all points' do + archiver.call + expect(Point.where(raw_data_archived: true).count).to eq(5) + end + end + end + + describe '#archive_specific_month' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let(:test_date) { 3.months.ago.beginning_of_month.utc } + let!(:june_points) do + create_list(:point, 3, user: user, + timestamp: test_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'archives specific month' do + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to change(Points::RawDataArchive, :count).by(1) + end + + it 'creates archive with correct metadata' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = Points::RawDataArchive.last + expect(archive.user_id).to eq(user.id) + expect(archive.year).to eq(test_date.year) + expect(archive.month).to eq(test_date.month) + expect(archive.point_count).to eq(3) + expect(archive.chunk_number).to eq(1) + end + + it 'attaches compressed file' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = Points::RawDataArchive.last + expect(archive.file).to be_attached + expect(archive.file.filename.to_s).to match(/raw_data_.*_chunk001\.jsonl\.gz/) + end + end + + describe 'append-only architecture' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let(:test_date) { 3.months.ago.beginning_of_month } + let(:test_date_utc) { Time.at(test_date.to_i).utc } + let!(:june_points_batch1) do + create_list(:point, 2, user: user, + timestamp: test_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + xit 'creates additional chunks for same month' do + # First archival + archiver.archive_specific_month(user.id, test_date_utc.year, test_date_utc.month) + expect(Points::RawDataArchive.for_month(user.id, test_date_utc.year, test_date_utc.month).count).to eq(1) + expect(Points::RawDataArchive.last.chunk_number).to eq(1) + + # Add more points for same month (retrospective import) + mid_month = test_date_utc + 15.days + june_points_batch2 = create_list(:point, 2, user: user, + timestamp: mid_month.to_i, + raw_data: { lon: 14.0, lat: 53.0 }) + + # Second archival + archiver.archive_specific_month(user.id, test_date_utc.year, test_date_utc.month) + expect(Points::RawDataArchive.for_month(user.id, test_date_utc.year, test_date_utc.month).count).to eq(2) + expect(Points::RawDataArchive.last.chunk_number).to eq(2) + end + end + + describe 'advisory locking' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let!(:june_points) do + old_date = 3.months.ago.beginning_of_month + create_list(:point, 2, user: user, + timestamp: old_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'prevents duplicate processing with advisory locks' do + # Simulate lock couldn't be acquired (returns nil/false) + allow(ActiveRecord::Base).to receive(:with_advisory_lock).and_return(false) + + result = archiver.call + expect(result[:processed]).to eq(0) + expect(result[:failed]).to eq(0) + end + end +end diff --git a/spec/services/points/raw_data/chunk_compressor_spec.rb b/spec/services/points/raw_data/chunk_compressor_spec.rb new file mode 100644 index 00000000..bcc41d59 --- /dev/null +++ b/spec/services/points/raw_data/chunk_compressor_spec.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Points::RawData::ChunkCompressor do + let(:user) { create(:user) } + + before do + # Stub broadcasting to avoid ActionCable issues in tests + allow(PointsChannel).to receive(:broadcast_to) + end + let(:points) do + [ + create(:point, user: user, raw_data: { lon: 13.4, lat: 52.5 }), + create(:point, user: user, raw_data: { lon: 13.5, lat: 52.6 }), + create(:point, user: user, raw_data: { lon: 13.6, lat: 52.7 }) + ] + end + let(:compressor) { described_class.new(Point.where(id: points.map(&:id))) } + + describe '#compress' do + it 'returns compressed gzip data' do + result = compressor.compress + expect(result).to be_a(String) + expect(result.encoding).to eq(Encoding::ASCII_8BIT) + end + + it 'compresses points as JSONL format' do + compressed = compressor.compress + + # Decompress and verify format + io = StringIO.new(compressed) + gz = Zlib::GzipReader.new(io) + lines = gz.readlines + gz.close + + expect(lines.count).to eq(3) + + # Each line should be valid JSON + lines.each_with_index do |line, index| + data = JSON.parse(line) + expect(data).to have_key('id') + expect(data).to have_key('raw_data') + expect(data['id']).to eq(points[index].id) + end + end + + it 'includes point ID and raw_data in each line' do + compressed = compressor.compress + + io = StringIO.new(compressed) + gz = Zlib::GzipReader.new(io) + first_line = gz.readline + gz.close + + data = JSON.parse(first_line) + expect(data['id']).to eq(points.first.id) + expect(data['raw_data']).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) + end + + it 'processes points in batches' do + # Create many points to test batch processing + many_points = create_list(:point, 2500, user: user, raw_data: { lon: 13.4, lat: 52.5 }) + large_compressor = described_class.new(Point.where(id: many_points.map(&:id))) + + compressed = large_compressor.compress + + io = StringIO.new(compressed) + gz = Zlib::GzipReader.new(io) + line_count = 0 + gz.each_line { line_count += 1 } + gz.close + + expect(line_count).to eq(2500) + end + + it 'produces smaller compressed output than uncompressed' do + compressed = compressor.compress + + # Decompress to get original size + io = StringIO.new(compressed) + gz = Zlib::GzipReader.new(io) + decompressed = gz.read + gz.close + + # Compressed should be smaller + expect(compressed.bytesize).to be < decompressed.bytesize + end + end +end diff --git a/spec/services/points/raw_data/restorer_spec.rb b/spec/services/points/raw_data/restorer_spec.rb new file mode 100644 index 00000000..4238ee7f --- /dev/null +++ b/spec/services/points/raw_data/restorer_spec.rb @@ -0,0 +1,171 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe Points::RawData::Restorer do + let(:user) { create(:user) } + let(:restorer) { described_class.new } + + before do + # Stub broadcasting to avoid ActionCable issues in tests + allow(PointsChannel).to receive(:broadcast_to) + end + + describe '#restore_to_database' do + let(:archive) { create(:points_raw_data_archive, user: user, year: 2024, month: 6) } + let!(:archived_points) do + points = create_list(:point, 3, user: user, timestamp: Time.new(2024, 6, 15).to_i, + raw_data: nil, raw_data_archived: true, raw_data_archive: archive) + + # Mock archive file with actual point data + compressed_data = gzip_points_data(points.map do |p| + { id: p.id, raw_data: { lon: 13.4, lat: 52.5 } } + end) + allow(archive.file.blob).to receive(:download).and_return(compressed_data) + + points + end + + it 'restores raw_data to database' do + restorer.restore_to_database(user.id, 2024, 6) + + archived_points.each(&:reload) + archived_points.each do |point| + expect(point.raw_data).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) + end + end + + it 'clears archive flags' do + restorer.restore_to_database(user.id, 2024, 6) + + archived_points.each(&:reload) + archived_points.each do |point| + expect(point.raw_data_archived).to be false + expect(point.raw_data_archive_id).to be_nil + end + end + + it 'raises error when no archives found' do + expect do + restorer.restore_to_database(user.id, 2025, 12) + end.to raise_error(/No archives found/) + end + + context 'with multiple chunks' do + let!(:archive2) { create(:points_raw_data_archive, user: user, year: 2024, month: 6, chunk_number: 2) } + let!(:more_points) do + points = create_list(:point, 2, user: user, timestamp: Time.new(2024, 6, 20).to_i, + raw_data: nil, raw_data_archived: true, raw_data_archive: archive2) + + compressed_data = gzip_points_data(points.map do |p| + { id: p.id, raw_data: { lon: 14.0, lat: 53.0 } } + end) + allow(archive2.file.blob).to receive(:download).and_return(compressed_data) + + points + end + + it 'restores from all chunks' do + restorer.restore_to_database(user.id, 2024, 6) + + (archived_points + more_points).each(&:reload) + expect(archived_points.first.raw_data).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) + expect(more_points.first.raw_data).to eq({ 'lon' => 14.0, 'lat' => 53.0 }) + end + end + end + + describe '#restore_to_memory' do + let(:archive) { create(:points_raw_data_archive, user: user, year: 2024, month: 6) } + let!(:archived_points) do + points = create_list(:point, 2, user: user, timestamp: Time.new(2024, 6, 15).to_i, + raw_data: nil, raw_data_archived: true, raw_data_archive: archive) + + compressed_data = gzip_points_data(points.map do |p| + { id: p.id, raw_data: { lon: 13.4, lat: 52.5 } } + end) + allow(archive.file.blob).to receive(:download).and_return(compressed_data) + + points + end + + it 'loads data into cache' do + restorer.restore_to_memory(user.id, 2024, 6) + + archived_points.each do |point| + cache_key = "raw_data:temp:#{user.id}:2024:6:#{point.id}" + cached_value = Rails.cache.read(cache_key) + expect(cached_value).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) + end + end + + it 'does not modify database' do + restorer.restore_to_memory(user.id, 2024, 6) + + archived_points.each(&:reload) + archived_points.each do |point| + expect(point.raw_data).to be_nil + expect(point.raw_data_archived).to be true + end + end + + it 'sets cache expiration to 1 hour' do + restorer.restore_to_memory(user.id, 2024, 6) + + cache_key = "raw_data:temp:#{user.id}:2024:6:#{archived_points.first.id}" + + # Cache should exist now + expect(Rails.cache.exist?(cache_key)).to be true + end + end + + describe '#restore_all_for_user' do + let!(:june_archive) { create(:points_raw_data_archive, user: user, year: 2024, month: 6) } + let!(:july_archive) { create(:points_raw_data_archive, user: user, year: 2024, month: 7) } + + let!(:june_points) do + points = create_list(:point, 2, user: user, timestamp: Time.new(2024, 6, 15).to_i, + raw_data: nil, raw_data_archived: true, raw_data_archive: june_archive) + + compressed_data = gzip_points_data(points.map { |p| { id: p.id, raw_data: { month: 'june' } } }) + allow(june_archive.file.blob).to receive(:download).and_return(compressed_data) + points + end + + let!(:july_points) do + points = create_list(:point, 2, user: user, timestamp: Time.new(2024, 7, 15).to_i, + raw_data: nil, raw_data_archived: true, raw_data_archive: july_archive) + + compressed_data = gzip_points_data(points.map { |p| { id: p.id, raw_data: { month: 'july' } } }) + allow(july_archive.file.blob).to receive(:download).and_return(compressed_data) + points + end + + it 'restores all months for user' do + restorer.restore_all_for_user(user.id) + + june_points.each(&:reload) + july_points.each(&:reload) + + expect(june_points.first.raw_data).to eq({ 'month' => 'june' }) + expect(july_points.first.raw_data).to eq({ 'month' => 'july' }) + end + + it 'clears all archive flags' do + restorer.restore_all_for_user(user.id) + + (june_points + july_points).each(&:reload) + expect(Point.where(user: user, raw_data_archived: true).count).to eq(0) + end + end + + def gzip_points_data(points_array) + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + points_array.each do |point_data| + gz.puts(point_data.to_json) + end + gz.close + io.string + end +end