From 4d462385b34b2e3f57aa33ff88643eb5e508f6da Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Thu, 8 Jan 2026 00:05:11 +0100 Subject: [PATCH] Add immediate verification and count validation to raw data archiving --- app/jobs/points/raw_data/verify_job.rb | 21 +++ app/models/points/raw_data_archive.rb | 30 ++++ app/services/points/raw_data/archiver.rb | 89 +++++++++- .../points/raw_data/chunk_compressor.rb | 7 +- config/schedule.yml | 5 + .../services/points/raw_data/archiver_spec.rb | 153 ++++++++++++++++++ .../points/raw_data/chunk_compressor_spec.rb | 44 ++++- 7 files changed, 336 insertions(+), 13 deletions(-) create mode 100644 app/jobs/points/raw_data/verify_job.rb diff --git a/app/jobs/points/raw_data/verify_job.rb b/app/jobs/points/raw_data/verify_job.rb new file mode 100644 index 00000000..bf5aecce --- /dev/null +++ b/app/jobs/points/raw_data/verify_job.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Points + module RawData + class VerifyJob < ApplicationJob + queue_as :archival + + def perform + return unless ENV['ARCHIVE_RAW_DATA'] == 'true' + + stats = Points::RawData::Verifier.new.call + + Rails.logger.info("Verification job complete: #{stats}") + rescue StandardError => e + ExceptionReporter.call(e, 'Points raw data verification job failed') + + raise + end + end + end +end diff --git a/app/models/points/raw_data_archive.rb b/app/models/points/raw_data_archive.rb index 4657e091..a270a5c4 100644 --- a/app/models/points/raw_data_archive.rb +++ b/app/models/points/raw_data_archive.rb @@ -13,8 +13,11 @@ module Points validates :year, numericality: { greater_than: 1970, less_than: 2100 } validates :month, numericality: { greater_than_or_equal_to: 1, less_than_or_equal_to: 12 } validates :chunk_number, numericality: { greater_than: 0 } + validates :point_count, numericality: { greater_than: 0 } validates :point_ids_checksum, presence: true + validate :metadata_contains_expected_and_actual_counts + scope :for_month, lambda { |user_id, year, month| where(user_id: user_id, year: year, month: month) .order(:chunk_number) @@ -36,5 +39,32 @@ module Points (file.blob.byte_size / 1024.0 / 1024.0).round(2) end + + def verified? + verified_at.present? + end + + def count_mismatch? + return false unless metadata.present? + + expected = metadata['expected_count'] + actual = metadata['actual_count'] + + return false if expected.nil? || actual.nil? + + expected != actual + end + + private + + def metadata_contains_expected_and_actual_counts + return if metadata.blank? + return if metadata['format_version'].blank? + + # All archives must contain both expected_count and actual_count for data integrity + if metadata['expected_count'].blank? || metadata['actual_count'].blank? + errors.add(:metadata, 'must contain expected_count and actual_count') + end + end end end diff --git a/app/services/points/raw_data/archiver.rb b/app/services/points/raw_data/archiver.rb index 350a8c24..53d439e6 100644 --- a/app/services/points/raw_data/archiver.rb +++ b/app/services/points/raw_data/archiver.rb @@ -97,6 +97,15 @@ module Points log_archival_start(user_id, year, month, point_ids.count) archive = create_archive_chunk(user_id, year, month, points, point_ids) + + # Immediate verification before marking points as archived + verification_result = verify_archive_immediately(archive, point_ids) + unless verification_result[:success] + Rails.logger.error("Immediate verification failed: #{verification_result[:error]}") + archive.destroy # Cleanup failed archive + raise StandardError, "Archive verification failed: #{verification_result[:error]}" + end + mark_points_as_archived(point_ids, archive.id) update_stats(point_ids.count) log_archival_success(archive) @@ -144,8 +153,22 @@ module Points .where(user_id: user_id, year: year, month: month) .maximum(:chunk_number).to_i + 1 - # Compress points data - compressed_data = Points::RawData::ChunkCompressor.new(points).compress + # Compress points data and get count + compression_result = Points::RawData::ChunkCompressor.new(points).compress + compressed_data = compression_result[:data] + actual_count = compression_result[:count] + + # Validate count: critical data integrity check + expected_count = point_ids.count + if actual_count != expected_count + error_msg = "Archive count mismatch for user #{user_id} #{year}-#{format('%02d', month)}: " \ + "expected #{expected_count} points, but only #{actual_count} were compressed" + Rails.logger.error(error_msg) + ExceptionReporter.call(StandardError.new(error_msg), error_msg) + raise StandardError, error_msg + end + + Rails.logger.info("✓ Compression validated: #{actual_count}/#{expected_count} points") # Create archive record archive = Points::RawDataArchive.create!( @@ -153,13 +176,15 @@ module Points year: year, month: month, chunk_number: chunk_number, - point_count: point_ids.count, + point_count: actual_count, # Use actual count, not assumed point_ids_checksum: calculate_checksum(point_ids), archived_at: Time.current, metadata: { format_version: 1, compression: 'gzip', - archived_by: 'Points::RawData::Archiver' + archived_by: 'Points::RawData::Archiver', + expected_count: expected_count, + actual_count: actual_count } ) @@ -179,6 +204,62 @@ module Points def calculate_checksum(point_ids) Digest::SHA256.hexdigest(point_ids.sort.join(',')) end + + def verify_archive_immediately(archive, expected_point_ids) + # Lightweight verification immediately after archiving + # Ensures archive is valid before marking points as archived + + # 1. Verify file is attached + unless archive.file.attached? + return { success: false, error: 'File not attached' } + end + + # 2. Verify file can be downloaded + begin + compressed_content = archive.file.blob.download + rescue StandardError => e + return { success: false, error: "File download failed: #{e.message}" } + end + + # 3. Verify file size is reasonable + if compressed_content.bytesize.zero? + return { success: false, error: 'File is empty' } + end + + # 4. Verify file can be decompressed and parse JSONL + begin + io = StringIO.new(compressed_content) + gz = Zlib::GzipReader.new(io) + archived_point_ids = [] + + gz.each_line do |line| + data = JSON.parse(line) + archived_point_ids << data['id'] + end + + gz.close + rescue StandardError => e + return { success: false, error: "Decompression/parsing failed: #{e.message}" } + end + + # 5. Verify point count matches + if archived_point_ids.count != expected_point_ids.count + return { + success: false, + error: "Point count mismatch in archive: expected #{expected_point_ids.count}, found #{archived_point_ids.count}" + } + end + + # 6. Verify point IDs checksum matches + archived_checksum = calculate_checksum(archived_point_ids) + expected_checksum = calculate_checksum(expected_point_ids) + if archived_checksum != expected_checksum + return { success: false, error: 'Point IDs checksum mismatch in archive' } + end + + Rails.logger.info("✓ Immediate verification passed for archive #{archive.id}") + { success: true } + end end end end diff --git a/app/services/points/raw_data/chunk_compressor.rb b/app/services/points/raw_data/chunk_compressor.rb index bf26e66b..8d389857 100644 --- a/app/services/points/raw_data/chunk_compressor.rb +++ b/app/services/points/raw_data/chunk_compressor.rb @@ -10,15 +10,20 @@ module Points def compress io = StringIO.new gz = Zlib::GzipWriter.new(io) + written_count = 0 # Stream points to avoid memory issues with large months @points.select(:id, :raw_data).find_each(batch_size: 1000) do |point| # Write as JSONL (one JSON object per line) gz.puts({ id: point.id, raw_data: point.raw_data }.to_json) + written_count += 1 end gz.close - io.string.force_encoding(Encoding::ASCII_8BIT) # Returns compressed bytes in binary encoding + compressed_data = io.string.force_encoding(Encoding::ASCII_8BIT) + + # Return both compressed data and count for validation + { data: compressed_data, count: written_count } end end end diff --git a/config/schedule.yml b/config/schedule.yml index 84cf04b1..d3e14cba 100644 --- a/config/schedule.yml +++ b/config/schedule.yml @@ -59,3 +59,8 @@ rails_pulse_clean_up_job: cron: "0 1 * * *" # every day at 01:00 class: "RailsPulse::CleanupJob" queue: default + +raw_data_archive_verification_job: + cron: "0 3 * * *" # every day at 03:00 + class: "Points::RawData::VerifyJob" + queue: archival diff --git a/spec/services/points/raw_data/archiver_spec.rb b/spec/services/points/raw_data/archiver_spec.rb index 259056de..da9c2f11 100644 --- a/spec/services/points/raw_data/archiver_spec.rb +++ b/spec/services/points/raw_data/archiver_spec.rb @@ -199,4 +199,157 @@ RSpec.describe Points::RawData::Archiver do expect(result[:failed]).to eq(0) end end + + describe 'count validation (P0 implementation)' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let(:test_date) { 3.months.ago.beginning_of_month.utc } + let!(:test_points) do + create_list(:point, 5, user: user, + timestamp: test_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'validates compression count matches expected count' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = user.raw_data_archives.last + expect(archive.point_count).to eq(5) + expect(archive.metadata['expected_count']).to eq(5) + expect(archive.metadata['actual_count']).to eq(5) + end + + it 'stores both expected and actual counts in metadata' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = user.raw_data_archives.last + expect(archive.metadata).to have_key('expected_count') + expect(archive.metadata).to have_key('actual_count') + expect(archive.metadata['expected_count']).to eq(archive.metadata['actual_count']) + end + + it 'raises error when compression count mismatch occurs' do + # Create proper gzip compressed data with only 3 points instead of 5 + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + 3.times do |i| + gz.puts({ id: i, raw_data: { test: 'data' } }.to_json) + end + gz.close + fake_compressed_data = io.string.force_encoding(Encoding::ASCII_8BIT) + + # Mock ChunkCompressor to return mismatched count + fake_compressor = instance_double(Points::RawData::ChunkCompressor) + allow(Points::RawData::ChunkCompressor).to receive(:new).and_return(fake_compressor) + allow(fake_compressor).to receive(:compress).and_return( + { data: fake_compressed_data, count: 3 } # Returning 3 instead of 5 + ) + + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to raise_error(StandardError, /Archive count mismatch/) + end + + it 'does not mark points as archived if count mismatch detected' do + # Create proper gzip compressed data with only 3 points instead of 5 + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + 3.times do |i| + gz.puts({ id: i, raw_data: { test: 'data' } }.to_json) + end + gz.close + fake_compressed_data = io.string.force_encoding(Encoding::ASCII_8BIT) + + # Mock ChunkCompressor to return mismatched count + fake_compressor = instance_double(Points::RawData::ChunkCompressor) + allow(Points::RawData::ChunkCompressor).to receive(:new).and_return(fake_compressor) + allow(fake_compressor).to receive(:compress).and_return( + { data: fake_compressed_data, count: 3 } + ) + + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to raise_error(StandardError) + + # Verify points are NOT marked as archived + test_points.each(&:reload) + expect(test_points.none?(&:raw_data_archived)).to be true + end + end + + describe 'immediate verification (P0 implementation)' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let(:test_date) { 3.months.ago.beginning_of_month.utc } + let!(:test_points) do + create_list(:point, 3, user: user, + timestamp: test_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'runs immediate verification after archiving' do + # Spy on the verify_archive_immediately method + allow(archiver).to receive(:verify_archive_immediately).and_call_original + + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + expect(archiver).to have_received(:verify_archive_immediately) + end + + it 'rolls back archive if immediate verification fails' do + # Mock verification to fail + allow(archiver).to receive(:verify_archive_immediately).and_return( + { success: false, error: 'Test verification failure' } + ) + + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to raise_error(StandardError, /Archive verification failed/) + + # Verify archive was destroyed + expect(Points::RawDataArchive.count).to eq(0) + + # Verify points are NOT marked as archived + test_points.each(&:reload) + expect(test_points.none?(&:raw_data_archived)).to be true + end + + it 'completes successfully when immediate verification passes' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + # Verify archive was created + expect(Points::RawDataArchive.count).to eq(1) + + # Verify points ARE marked as archived + test_points.each(&:reload) + expect(test_points.all?(&:raw_data_archived)).to be true + end + + it 'validates point IDs checksum during immediate verification' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = user.raw_data_archives.last + expect(archive.point_ids_checksum).to be_present + + # Decompress and verify the archived point IDs match + compressed_content = archive.file.blob.download + io = StringIO.new(compressed_content) + gz = Zlib::GzipReader.new(io) + archived_point_ids = [] + + gz.each_line do |line| + data = JSON.parse(line) + archived_point_ids << data['id'] + end + gz.close + + expect(archived_point_ids.sort).to eq(test_points.map(&:id).sort) + end + end end diff --git a/spec/services/points/raw_data/chunk_compressor_spec.rb b/spec/services/points/raw_data/chunk_compressor_spec.rb index c8d66983..f27bbdc0 100644 --- a/spec/services/points/raw_data/chunk_compressor_spec.rb +++ b/spec/services/points/raw_data/chunk_compressor_spec.rb @@ -19,14 +19,26 @@ RSpec.describe Points::RawData::ChunkCompressor do let(:compressor) { described_class.new(Point.where(id: points.map(&:id))) } describe '#compress' do - it 'returns compressed gzip data' do + it 'returns a hash with data and count' do result = compressor.compress - expect(result).to be_a(String) - expect(result.encoding.name).to eq('ASCII-8BIT') + + expect(result).to be_a(Hash) + expect(result).to have_key(:data) + expect(result).to have_key(:count) + expect(result[:data]).to be_a(String) + expect(result[:data].encoding.name).to eq('ASCII-8BIT') + expect(result[:count]).to eq(3) + end + + it 'returns correct count of compressed points' do + result = compressor.compress + + expect(result[:count]).to eq(points.count) end it 'compresses points as JSONL format' do - compressed = compressor.compress + result = compressor.compress + compressed = result[:data] # Decompress and verify format io = StringIO.new(compressed) @@ -35,6 +47,7 @@ RSpec.describe Points::RawData::ChunkCompressor do gz.close expect(lines.count).to eq(3) + expect(result[:count]).to eq(3) # Each line should be valid JSON lines.each_with_index do |line, index| @@ -46,7 +59,8 @@ RSpec.describe Points::RawData::ChunkCompressor do end it 'includes point ID and raw_data in each line' do - compressed = compressor.compress + result = compressor.compress + compressed = result[:data] io = StringIO.new(compressed) gz = Zlib::GzipReader.new(io) @@ -58,7 +72,7 @@ RSpec.describe Points::RawData::ChunkCompressor do expect(data['raw_data']).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) end - it 'processes points in batches' do + it 'processes points in batches and returns correct count' do # Create many points to test batch processing with unique timestamps many_points = [] base_time = Time.new(2024, 6, 15).to_i @@ -67,7 +81,8 @@ RSpec.describe Points::RawData::ChunkCompressor do end large_compressor = described_class.new(Point.where(id: many_points.map(&:id))) - compressed = large_compressor.compress + result = large_compressor.compress + compressed = result[:data] io = StringIO.new(compressed) gz = Zlib::GzipReader.new(io) @@ -76,10 +91,12 @@ RSpec.describe Points::RawData::ChunkCompressor do gz.close expect(line_count).to eq(2500) + expect(result[:count]).to eq(2500) end it 'produces smaller compressed output than uncompressed' do - compressed = compressor.compress + result = compressor.compress + compressed = result[:data] # Decompress to get original size io = StringIO.new(compressed) @@ -90,5 +107,16 @@ RSpec.describe Points::RawData::ChunkCompressor do # Compressed should be smaller expect(compressed.bytesize).to be < decompressed.bytesize end + + context 'with empty point set' do + let(:empty_compressor) { described_class.new(Point.none) } + + it 'returns zero count for empty point set' do + result = empty_compressor.compress + + expect(result[:count]).to eq(0) + expect(result[:data]).to be_a(String) + end + end end end