From 4d462385b34b2e3f57aa33ff88643eb5e508f6da Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Thu, 8 Jan 2026 00:05:11 +0100 Subject: [PATCH 1/3] Add immediate verification and count validation to raw data archiving --- app/jobs/points/raw_data/verify_job.rb | 21 +++ app/models/points/raw_data_archive.rb | 30 ++++ app/services/points/raw_data/archiver.rb | 89 +++++++++- .../points/raw_data/chunk_compressor.rb | 7 +- config/schedule.yml | 5 + .../services/points/raw_data/archiver_spec.rb | 153 ++++++++++++++++++ .../points/raw_data/chunk_compressor_spec.rb | 44 ++++- 7 files changed, 336 insertions(+), 13 deletions(-) create mode 100644 app/jobs/points/raw_data/verify_job.rb diff --git a/app/jobs/points/raw_data/verify_job.rb b/app/jobs/points/raw_data/verify_job.rb new file mode 100644 index 00000000..bf5aecce --- /dev/null +++ b/app/jobs/points/raw_data/verify_job.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Points + module RawData + class VerifyJob < ApplicationJob + queue_as :archival + + def perform + return unless ENV['ARCHIVE_RAW_DATA'] == 'true' + + stats = Points::RawData::Verifier.new.call + + Rails.logger.info("Verification job complete: #{stats}") + rescue StandardError => e + ExceptionReporter.call(e, 'Points raw data verification job failed') + + raise + end + end + end +end diff --git a/app/models/points/raw_data_archive.rb b/app/models/points/raw_data_archive.rb index 4657e091..a270a5c4 100644 --- a/app/models/points/raw_data_archive.rb +++ b/app/models/points/raw_data_archive.rb @@ -13,8 +13,11 @@ module Points validates :year, numericality: { greater_than: 1970, less_than: 2100 } validates :month, numericality: { greater_than_or_equal_to: 1, less_than_or_equal_to: 12 } validates :chunk_number, numericality: { greater_than: 0 } + validates :point_count, numericality: { greater_than: 0 } validates :point_ids_checksum, presence: true + validate :metadata_contains_expected_and_actual_counts + scope :for_month, lambda { |user_id, year, month| where(user_id: user_id, year: year, month: month) .order(:chunk_number) @@ -36,5 +39,32 @@ module Points (file.blob.byte_size / 1024.0 / 1024.0).round(2) end + + def verified? + verified_at.present? + end + + def count_mismatch? + return false unless metadata.present? + + expected = metadata['expected_count'] + actual = metadata['actual_count'] + + return false if expected.nil? || actual.nil? + + expected != actual + end + + private + + def metadata_contains_expected_and_actual_counts + return if metadata.blank? + return if metadata['format_version'].blank? + + # All archives must contain both expected_count and actual_count for data integrity + if metadata['expected_count'].blank? || metadata['actual_count'].blank? + errors.add(:metadata, 'must contain expected_count and actual_count') + end + end end end diff --git a/app/services/points/raw_data/archiver.rb b/app/services/points/raw_data/archiver.rb index 350a8c24..53d439e6 100644 --- a/app/services/points/raw_data/archiver.rb +++ b/app/services/points/raw_data/archiver.rb @@ -97,6 +97,15 @@ module Points log_archival_start(user_id, year, month, point_ids.count) archive = create_archive_chunk(user_id, year, month, points, point_ids) + + # Immediate verification before marking points as archived + verification_result = verify_archive_immediately(archive, point_ids) + unless verification_result[:success] + Rails.logger.error("Immediate verification failed: #{verification_result[:error]}") + archive.destroy # Cleanup failed archive + raise StandardError, "Archive verification failed: #{verification_result[:error]}" + end + mark_points_as_archived(point_ids, archive.id) update_stats(point_ids.count) log_archival_success(archive) @@ -144,8 +153,22 @@ module Points .where(user_id: user_id, year: year, month: month) .maximum(:chunk_number).to_i + 1 - # Compress points data - compressed_data = Points::RawData::ChunkCompressor.new(points).compress + # Compress points data and get count + compression_result = Points::RawData::ChunkCompressor.new(points).compress + compressed_data = compression_result[:data] + actual_count = compression_result[:count] + + # Validate count: critical data integrity check + expected_count = point_ids.count + if actual_count != expected_count + error_msg = "Archive count mismatch for user #{user_id} #{year}-#{format('%02d', month)}: " \ + "expected #{expected_count} points, but only #{actual_count} were compressed" + Rails.logger.error(error_msg) + ExceptionReporter.call(StandardError.new(error_msg), error_msg) + raise StandardError, error_msg + end + + Rails.logger.info("✓ Compression validated: #{actual_count}/#{expected_count} points") # Create archive record archive = Points::RawDataArchive.create!( @@ -153,13 +176,15 @@ module Points year: year, month: month, chunk_number: chunk_number, - point_count: point_ids.count, + point_count: actual_count, # Use actual count, not assumed point_ids_checksum: calculate_checksum(point_ids), archived_at: Time.current, metadata: { format_version: 1, compression: 'gzip', - archived_by: 'Points::RawData::Archiver' + archived_by: 'Points::RawData::Archiver', + expected_count: expected_count, + actual_count: actual_count } ) @@ -179,6 +204,62 @@ module Points def calculate_checksum(point_ids) Digest::SHA256.hexdigest(point_ids.sort.join(',')) end + + def verify_archive_immediately(archive, expected_point_ids) + # Lightweight verification immediately after archiving + # Ensures archive is valid before marking points as archived + + # 1. Verify file is attached + unless archive.file.attached? + return { success: false, error: 'File not attached' } + end + + # 2. Verify file can be downloaded + begin + compressed_content = archive.file.blob.download + rescue StandardError => e + return { success: false, error: "File download failed: #{e.message}" } + end + + # 3. Verify file size is reasonable + if compressed_content.bytesize.zero? + return { success: false, error: 'File is empty' } + end + + # 4. Verify file can be decompressed and parse JSONL + begin + io = StringIO.new(compressed_content) + gz = Zlib::GzipReader.new(io) + archived_point_ids = [] + + gz.each_line do |line| + data = JSON.parse(line) + archived_point_ids << data['id'] + end + + gz.close + rescue StandardError => e + return { success: false, error: "Decompression/parsing failed: #{e.message}" } + end + + # 5. Verify point count matches + if archived_point_ids.count != expected_point_ids.count + return { + success: false, + error: "Point count mismatch in archive: expected #{expected_point_ids.count}, found #{archived_point_ids.count}" + } + end + + # 6. Verify point IDs checksum matches + archived_checksum = calculate_checksum(archived_point_ids) + expected_checksum = calculate_checksum(expected_point_ids) + if archived_checksum != expected_checksum + return { success: false, error: 'Point IDs checksum mismatch in archive' } + end + + Rails.logger.info("✓ Immediate verification passed for archive #{archive.id}") + { success: true } + end end end end diff --git a/app/services/points/raw_data/chunk_compressor.rb b/app/services/points/raw_data/chunk_compressor.rb index bf26e66b..8d389857 100644 --- a/app/services/points/raw_data/chunk_compressor.rb +++ b/app/services/points/raw_data/chunk_compressor.rb @@ -10,15 +10,20 @@ module Points def compress io = StringIO.new gz = Zlib::GzipWriter.new(io) + written_count = 0 # Stream points to avoid memory issues with large months @points.select(:id, :raw_data).find_each(batch_size: 1000) do |point| # Write as JSONL (one JSON object per line) gz.puts({ id: point.id, raw_data: point.raw_data }.to_json) + written_count += 1 end gz.close - io.string.force_encoding(Encoding::ASCII_8BIT) # Returns compressed bytes in binary encoding + compressed_data = io.string.force_encoding(Encoding::ASCII_8BIT) + + # Return both compressed data and count for validation + { data: compressed_data, count: written_count } end end end diff --git a/config/schedule.yml b/config/schedule.yml index 84cf04b1..d3e14cba 100644 --- a/config/schedule.yml +++ b/config/schedule.yml @@ -59,3 +59,8 @@ rails_pulse_clean_up_job: cron: "0 1 * * *" # every day at 01:00 class: "RailsPulse::CleanupJob" queue: default + +raw_data_archive_verification_job: + cron: "0 3 * * *" # every day at 03:00 + class: "Points::RawData::VerifyJob" + queue: archival diff --git a/spec/services/points/raw_data/archiver_spec.rb b/spec/services/points/raw_data/archiver_spec.rb index 259056de..da9c2f11 100644 --- a/spec/services/points/raw_data/archiver_spec.rb +++ b/spec/services/points/raw_data/archiver_spec.rb @@ -199,4 +199,157 @@ RSpec.describe Points::RawData::Archiver do expect(result[:failed]).to eq(0) end end + + describe 'count validation (P0 implementation)' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let(:test_date) { 3.months.ago.beginning_of_month.utc } + let!(:test_points) do + create_list(:point, 5, user: user, + timestamp: test_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'validates compression count matches expected count' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = user.raw_data_archives.last + expect(archive.point_count).to eq(5) + expect(archive.metadata['expected_count']).to eq(5) + expect(archive.metadata['actual_count']).to eq(5) + end + + it 'stores both expected and actual counts in metadata' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = user.raw_data_archives.last + expect(archive.metadata).to have_key('expected_count') + expect(archive.metadata).to have_key('actual_count') + expect(archive.metadata['expected_count']).to eq(archive.metadata['actual_count']) + end + + it 'raises error when compression count mismatch occurs' do + # Create proper gzip compressed data with only 3 points instead of 5 + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + 3.times do |i| + gz.puts({ id: i, raw_data: { test: 'data' } }.to_json) + end + gz.close + fake_compressed_data = io.string.force_encoding(Encoding::ASCII_8BIT) + + # Mock ChunkCompressor to return mismatched count + fake_compressor = instance_double(Points::RawData::ChunkCompressor) + allow(Points::RawData::ChunkCompressor).to receive(:new).and_return(fake_compressor) + allow(fake_compressor).to receive(:compress).and_return( + { data: fake_compressed_data, count: 3 } # Returning 3 instead of 5 + ) + + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to raise_error(StandardError, /Archive count mismatch/) + end + + it 'does not mark points as archived if count mismatch detected' do + # Create proper gzip compressed data with only 3 points instead of 5 + io = StringIO.new + gz = Zlib::GzipWriter.new(io) + 3.times do |i| + gz.puts({ id: i, raw_data: { test: 'data' } }.to_json) + end + gz.close + fake_compressed_data = io.string.force_encoding(Encoding::ASCII_8BIT) + + # Mock ChunkCompressor to return mismatched count + fake_compressor = instance_double(Points::RawData::ChunkCompressor) + allow(Points::RawData::ChunkCompressor).to receive(:new).and_return(fake_compressor) + allow(fake_compressor).to receive(:compress).and_return( + { data: fake_compressed_data, count: 3 } + ) + + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to raise_error(StandardError) + + # Verify points are NOT marked as archived + test_points.each(&:reload) + expect(test_points.none?(&:raw_data_archived)).to be true + end + end + + describe 'immediate verification (P0 implementation)' do + before do + allow(ENV).to receive(:[]).and_call_original + allow(ENV).to receive(:[]).with('ARCHIVE_RAW_DATA').and_return('true') + end + + let(:test_date) { 3.months.ago.beginning_of_month.utc } + let!(:test_points) do + create_list(:point, 3, user: user, + timestamp: test_date.to_i, + raw_data: { lon: 13.4, lat: 52.5 }) + end + + it 'runs immediate verification after archiving' do + # Spy on the verify_archive_immediately method + allow(archiver).to receive(:verify_archive_immediately).and_call_original + + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + expect(archiver).to have_received(:verify_archive_immediately) + end + + it 'rolls back archive if immediate verification fails' do + # Mock verification to fail + allow(archiver).to receive(:verify_archive_immediately).and_return( + { success: false, error: 'Test verification failure' } + ) + + expect do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + end.to raise_error(StandardError, /Archive verification failed/) + + # Verify archive was destroyed + expect(Points::RawDataArchive.count).to eq(0) + + # Verify points are NOT marked as archived + test_points.each(&:reload) + expect(test_points.none?(&:raw_data_archived)).to be true + end + + it 'completes successfully when immediate verification passes' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + # Verify archive was created + expect(Points::RawDataArchive.count).to eq(1) + + # Verify points ARE marked as archived + test_points.each(&:reload) + expect(test_points.all?(&:raw_data_archived)).to be true + end + + it 'validates point IDs checksum during immediate verification' do + archiver.archive_specific_month(user.id, test_date.year, test_date.month) + + archive = user.raw_data_archives.last + expect(archive.point_ids_checksum).to be_present + + # Decompress and verify the archived point IDs match + compressed_content = archive.file.blob.download + io = StringIO.new(compressed_content) + gz = Zlib::GzipReader.new(io) + archived_point_ids = [] + + gz.each_line do |line| + data = JSON.parse(line) + archived_point_ids << data['id'] + end + gz.close + + expect(archived_point_ids.sort).to eq(test_points.map(&:id).sort) + end + end end diff --git a/spec/services/points/raw_data/chunk_compressor_spec.rb b/spec/services/points/raw_data/chunk_compressor_spec.rb index c8d66983..f27bbdc0 100644 --- a/spec/services/points/raw_data/chunk_compressor_spec.rb +++ b/spec/services/points/raw_data/chunk_compressor_spec.rb @@ -19,14 +19,26 @@ RSpec.describe Points::RawData::ChunkCompressor do let(:compressor) { described_class.new(Point.where(id: points.map(&:id))) } describe '#compress' do - it 'returns compressed gzip data' do + it 'returns a hash with data and count' do result = compressor.compress - expect(result).to be_a(String) - expect(result.encoding.name).to eq('ASCII-8BIT') + + expect(result).to be_a(Hash) + expect(result).to have_key(:data) + expect(result).to have_key(:count) + expect(result[:data]).to be_a(String) + expect(result[:data].encoding.name).to eq('ASCII-8BIT') + expect(result[:count]).to eq(3) + end + + it 'returns correct count of compressed points' do + result = compressor.compress + + expect(result[:count]).to eq(points.count) end it 'compresses points as JSONL format' do - compressed = compressor.compress + result = compressor.compress + compressed = result[:data] # Decompress and verify format io = StringIO.new(compressed) @@ -35,6 +47,7 @@ RSpec.describe Points::RawData::ChunkCompressor do gz.close expect(lines.count).to eq(3) + expect(result[:count]).to eq(3) # Each line should be valid JSON lines.each_with_index do |line, index| @@ -46,7 +59,8 @@ RSpec.describe Points::RawData::ChunkCompressor do end it 'includes point ID and raw_data in each line' do - compressed = compressor.compress + result = compressor.compress + compressed = result[:data] io = StringIO.new(compressed) gz = Zlib::GzipReader.new(io) @@ -58,7 +72,7 @@ RSpec.describe Points::RawData::ChunkCompressor do expect(data['raw_data']).to eq({ 'lon' => 13.4, 'lat' => 52.5 }) end - it 'processes points in batches' do + it 'processes points in batches and returns correct count' do # Create many points to test batch processing with unique timestamps many_points = [] base_time = Time.new(2024, 6, 15).to_i @@ -67,7 +81,8 @@ RSpec.describe Points::RawData::ChunkCompressor do end large_compressor = described_class.new(Point.where(id: many_points.map(&:id))) - compressed = large_compressor.compress + result = large_compressor.compress + compressed = result[:data] io = StringIO.new(compressed) gz = Zlib::GzipReader.new(io) @@ -76,10 +91,12 @@ RSpec.describe Points::RawData::ChunkCompressor do gz.close expect(line_count).to eq(2500) + expect(result[:count]).to eq(2500) end it 'produces smaller compressed output than uncompressed' do - compressed = compressor.compress + result = compressor.compress + compressed = result[:data] # Decompress to get original size io = StringIO.new(compressed) @@ -90,5 +107,16 @@ RSpec.describe Points::RawData::ChunkCompressor do # Compressed should be smaller expect(compressed.bytesize).to be < decompressed.bytesize end + + context 'with empty point set' do + let(:empty_compressor) { described_class.new(Point.none) } + + it 'returns zero count for empty point set' do + result = empty_compressor.compress + + expect(result[:count]).to eq(0) + expect(result[:data]).to be_a(String) + end + end end end From 6b60eff90dd66542e6b0600b39391f49a32ffe1c Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Thu, 8 Jan 2026 00:10:01 +0100 Subject: [PATCH 2/3] Remove verifying job --- app/jobs/points/raw_data/verify_job.rb | 21 --------------------- config/schedule.yml | 5 ----- 2 files changed, 26 deletions(-) delete mode 100644 app/jobs/points/raw_data/verify_job.rb diff --git a/app/jobs/points/raw_data/verify_job.rb b/app/jobs/points/raw_data/verify_job.rb deleted file mode 100644 index bf5aecce..00000000 --- a/app/jobs/points/raw_data/verify_job.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -module Points - module RawData - class VerifyJob < ApplicationJob - queue_as :archival - - def perform - return unless ENV['ARCHIVE_RAW_DATA'] == 'true' - - stats = Points::RawData::Verifier.new.call - - Rails.logger.info("Verification job complete: #{stats}") - rescue StandardError => e - ExceptionReporter.call(e, 'Points raw data verification job failed') - - raise - end - end - end -end diff --git a/config/schedule.yml b/config/schedule.yml index d3e14cba..84cf04b1 100644 --- a/config/schedule.yml +++ b/config/schedule.yml @@ -59,8 +59,3 @@ rails_pulse_clean_up_job: cron: "0 1 * * *" # every day at 01:00 class: "RailsPulse::CleanupJob" queue: default - -raw_data_archive_verification_job: - cron: "0 3 * * *" # every day at 03:00 - class: "Points::RawData::VerifyJob" - queue: archival From f74828cc6b2888da5bb755bff843d5ce87b17ef0 Mon Sep 17 00:00:00 2001 From: Eugene Burmakin Date: Fri, 9 Jan 2026 00:27:32 +0100 Subject: [PATCH 3/3] Add archive metrics reporting --- .../metrics/archives/compression_ratio.rb | 22 ++++++ .../metrics/archives/count_mismatch.rb | 42 +++++++++++ app/services/metrics/archives/operation.rb | 30 ++++++++ .../metrics/archives/points_archived.rb | 25 +++++++ app/services/metrics/archives/size.rb | 29 +++++++ app/services/metrics/archives/verification.rb | 42 +++++++++++ app/services/points/raw_data/archiver.rb | 63 ++++++++++++++++ app/services/points/raw_data/clearer.rb | 21 ++++++ app/services/points/raw_data/restorer.rb | 34 ++++++++- app/services/points/raw_data/verifier.rb | 65 ++++++++++++++++ .../archives/compression_ratio_spec.rb | 51 +++++++++++++ .../metrics/archives/count_mismatch_spec.rb | 74 ++++++++++++++++++ .../metrics/archives/operation_spec.rb | 63 ++++++++++++++++ .../metrics/archives/points_archived_spec.rb | 61 +++++++++++++++ spec/services/metrics/archives/size_spec.rb | 51 +++++++++++++ .../metrics/archives/verification_spec.rb | 75 +++++++++++++++++++ 16 files changed, 744 insertions(+), 4 deletions(-) create mode 100644 app/services/metrics/archives/compression_ratio.rb create mode 100644 app/services/metrics/archives/count_mismatch.rb create mode 100644 app/services/metrics/archives/operation.rb create mode 100644 app/services/metrics/archives/points_archived.rb create mode 100644 app/services/metrics/archives/size.rb create mode 100644 app/services/metrics/archives/verification.rb create mode 100644 spec/services/metrics/archives/compression_ratio_spec.rb create mode 100644 spec/services/metrics/archives/count_mismatch_spec.rb create mode 100644 spec/services/metrics/archives/operation_spec.rb create mode 100644 spec/services/metrics/archives/points_archived_spec.rb create mode 100644 spec/services/metrics/archives/size_spec.rb create mode 100644 spec/services/metrics/archives/verification_spec.rb diff --git a/app/services/metrics/archives/compression_ratio.rb b/app/services/metrics/archives/compression_ratio.rb new file mode 100644 index 00000000..2aaf6d56 --- /dev/null +++ b/app/services/metrics/archives/compression_ratio.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Metrics::Archives::CompressionRatio + def initialize(original_size:, compressed_size:) + @ratio = compressed_size.to_f / original_size.to_f + end + + def call + return unless DawarichSettings.prometheus_exporter_enabled? + + metric_data = { + type: 'histogram', + name: 'dawarich_archive_compression_ratio', + value: @ratio, + buckets: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] + } + + PrometheusExporter::Client.default.send_json(metric_data) + rescue StandardError => e + Rails.logger.error("Failed to send compression ratio metric: #{e.message}") + end +end diff --git a/app/services/metrics/archives/count_mismatch.rb b/app/services/metrics/archives/count_mismatch.rb new file mode 100644 index 00000000..a08b2f57 --- /dev/null +++ b/app/services/metrics/archives/count_mismatch.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +class Metrics::Archives::CountMismatch + def initialize(user_id:, year:, month:, expected:, actual:) + @user_id = user_id + @year = year + @month = month + @expected = expected + @actual = actual + end + + def call + return unless DawarichSettings.prometheus_exporter_enabled? + + # Counter for critical errors + counter_data = { + type: 'counter', + name: 'dawarich_archive_count_mismatches_total', + value: 1, + labels: { + year: @year.to_s, + month: @month.to_s + } + } + + PrometheusExporter::Client.default.send_json(counter_data) + + # Gauge showing the difference + gauge_data = { + type: 'gauge', + name: 'dawarich_archive_count_difference', + value: (@expected - @actual).abs, + labels: { + user_id: @user_id.to_s + } + } + + PrometheusExporter::Client.default.send_json(gauge_data) + rescue StandardError => e + Rails.logger.error("Failed to send count mismatch metric: #{e.message}") + end +end diff --git a/app/services/metrics/archives/operation.rb b/app/services/metrics/archives/operation.rb new file mode 100644 index 00000000..fff86749 --- /dev/null +++ b/app/services/metrics/archives/operation.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class Metrics::Archives::Operation + OPERATIONS = %w[archive verify clear restore].freeze + + def initialize(operation:, status:, user_id: nil, points_count: 0) + @operation = operation + @status = status # 'success' or 'failure' + @user_id = user_id + @points_count = points_count + end + + def call + return unless DawarichSettings.prometheus_exporter_enabled? + + metric_data = { + type: 'counter', + name: 'dawarich_archive_operations_total', + value: 1, + labels: { + operation: @operation, + status: @status + } + } + + PrometheusExporter::Client.default.send_json(metric_data) + rescue StandardError => e + Rails.logger.error("Failed to send archive operation metric: #{e.message}") + end +end diff --git a/app/services/metrics/archives/points_archived.rb b/app/services/metrics/archives/points_archived.rb new file mode 100644 index 00000000..5e95746f --- /dev/null +++ b/app/services/metrics/archives/points_archived.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +class Metrics::Archives::PointsArchived + def initialize(count:, operation:) + @count = count + @operation = operation # 'added' or 'removed' + end + + def call + return unless DawarichSettings.prometheus_exporter_enabled? + + metric_data = { + type: 'counter', + name: 'dawarich_archive_points_total', + value: @count, + labels: { + operation: @operation + } + } + + PrometheusExporter::Client.default.send_json(metric_data) + rescue StandardError => e + Rails.logger.error("Failed to send points archived metric: #{e.message}") + end +end diff --git a/app/services/metrics/archives/size.rb b/app/services/metrics/archives/size.rb new file mode 100644 index 00000000..4b4a5cd9 --- /dev/null +++ b/app/services/metrics/archives/size.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +class Metrics::Archives::Size + def initialize(size_bytes:) + @size_bytes = size_bytes + end + + def call + return unless DawarichSettings.prometheus_exporter_enabled? + + metric_data = { + type: 'histogram', + name: 'dawarich_archive_size_bytes', + value: @size_bytes, + buckets: [ + 1_000_000, # 1 MB + 10_000_000, # 10 MB + 50_000_000, # 50 MB + 100_000_000, # 100 MB + 500_000_000, # 500 MB + 1_000_000_000 # 1 GB + ] + } + + PrometheusExporter::Client.default.send_json(metric_data) + rescue StandardError => e + Rails.logger.error("Failed to send archive size metric: #{e.message}") + end +end diff --git a/app/services/metrics/archives/verification.rb b/app/services/metrics/archives/verification.rb new file mode 100644 index 00000000..deb07889 --- /dev/null +++ b/app/services/metrics/archives/verification.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +class Metrics::Archives::Verification + def initialize(duration_seconds:, status:, check_name: nil) + @duration_seconds = duration_seconds + @status = status + @check_name = check_name + end + + def call + return unless DawarichSettings.prometheus_exporter_enabled? + + # Duration histogram + histogram_data = { + type: 'histogram', + name: 'dawarich_archive_verification_duration_seconds', + value: @duration_seconds, + labels: { + status: @status + }, + buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60] + } + + PrometheusExporter::Client.default.send_json(histogram_data) + + # Failed check counter (if failure) + if @status == 'failure' && @check_name + counter_data = { + type: 'counter', + name: 'dawarich_archive_verification_failures_total', + value: 1, + labels: { + check: @check_name # e.g., 'count_mismatch', 'checksum_mismatch' + } + } + + PrometheusExporter::Client.default.send_json(counter_data) + end + rescue StandardError => e + Rails.logger.error("Failed to send verification metric: #{e.message}") + end +end diff --git a/app/services/points/raw_data/archiver.rb b/app/services/points/raw_data/archiver.rb index 53d439e6..29a96be3 100644 --- a/app/services/points/raw_data/archiver.rb +++ b/app/services/points/raw_data/archiver.rb @@ -79,6 +79,14 @@ module Points lock_acquired = ActiveRecord::Base.with_advisory_lock(lock_key, timeout_seconds: 0) do archive_month(user_id, year, month) @stats[:processed] += 1 + + # Report successful archive operation + Metrics::Archives::Operation.new( + operation: 'archive', + status: 'success', + user_id: user_id + ).call + true end @@ -87,6 +95,13 @@ module Points ExceptionReporter.call(e, "Failed to archive points for user #{user_id}, #{year}-#{month}") @stats[:failed] += 1 + + # Report failed archive operation + Metrics::Archives::Operation.new( + operation: 'archive', + status: 'failure', + user_id: user_id + ).call end def archive_month(user_id, year, month) @@ -109,6 +124,12 @@ module Points mark_points_as_archived(point_ids, archive.id) update_stats(point_ids.count) log_archival_success(archive) + + # Report points archived + Metrics::Archives::PointsArchived.new( + count: point_ids.count, + operation: 'added' + ).call end def find_archivable_points(user_id, year, month) @@ -161,6 +182,15 @@ module Points # Validate count: critical data integrity check expected_count = point_ids.count if actual_count != expected_count + # Report count mismatch to metrics + Metrics::Archives::CountMismatch.new( + user_id: user_id, + year: year, + month: month, + expected: expected_count, + actual: actual_count + ).call + error_msg = "Archive count mismatch for user #{user_id} #{year}-#{format('%02d', month)}: " \ "expected #{expected_count} points, but only #{actual_count} were compressed" Rails.logger.error(error_msg) @@ -198,6 +228,21 @@ module Points key: "raw_data_archives/#{user_id}/#{year}/#{format('%02d', month)}/#{format('%03d', chunk_number)}.jsonl.gz" ) + # Report archive size + if archive.file.attached? + Metrics::Archives::Size.new( + size_bytes: archive.file.blob.byte_size + ).call + + # Report compression ratio (estimate original size from JSON) + # Rough estimate: each point as JSON ~100-200 bytes + estimated_original_size = actual_count * 150 + Metrics::Archives::CompressionRatio.new( + original_size: estimated_original_size, + compressed_size: archive.file.blob.byte_size + ).call + end + archive end @@ -208,9 +253,11 @@ module Points def verify_archive_immediately(archive, expected_point_ids) # Lightweight verification immediately after archiving # Ensures archive is valid before marking points as archived + start_time = Time.current # 1. Verify file is attached unless archive.file.attached? + report_verification_metric(start_time, 'failure', 'file_not_attached') return { success: false, error: 'File not attached' } end @@ -218,11 +265,13 @@ module Points begin compressed_content = archive.file.blob.download rescue StandardError => e + report_verification_metric(start_time, 'failure', 'download_failed') return { success: false, error: "File download failed: #{e.message}" } end # 3. Verify file size is reasonable if compressed_content.bytesize.zero? + report_verification_metric(start_time, 'failure', 'empty_file') return { success: false, error: 'File is empty' } end @@ -239,11 +288,13 @@ module Points gz.close rescue StandardError => e + report_verification_metric(start_time, 'failure', 'decompression_failed') return { success: false, error: "Decompression/parsing failed: #{e.message}" } end # 5. Verify point count matches if archived_point_ids.count != expected_point_ids.count + report_verification_metric(start_time, 'failure', 'count_mismatch') return { success: false, error: "Point count mismatch in archive: expected #{expected_point_ids.count}, found #{archived_point_ids.count}" @@ -254,12 +305,24 @@ module Points archived_checksum = calculate_checksum(archived_point_ids) expected_checksum = calculate_checksum(expected_point_ids) if archived_checksum != expected_checksum + report_verification_metric(start_time, 'failure', 'checksum_mismatch') return { success: false, error: 'Point IDs checksum mismatch in archive' } end Rails.logger.info("✓ Immediate verification passed for archive #{archive.id}") + report_verification_metric(start_time, 'success') { success: true } end + + def report_verification_metric(start_time, status, check_name = nil) + duration = Time.current - start_time + + Metrics::Archives::Verification.new( + duration_seconds: duration, + status: status, + check_name: check_name + ).call + end end end end diff --git a/app/services/points/raw_data/clearer.rb b/app/services/points/raw_data/clearer.rb index 46187824..444be714 100644 --- a/app/services/points/raw_data/clearer.rb +++ b/app/services/points/raw_data/clearer.rb @@ -74,9 +74,30 @@ module Points cleared_count = clear_points_in_batches(point_ids) @stats[:cleared] += cleared_count Rails.logger.info("✓ Cleared #{cleared_count} points for archive #{archive.id}") + + # Report successful clear operation + Metrics::Archives::Operation.new( + operation: 'clear', + status: 'success', + user_id: archive.user_id, + points_count: cleared_count + ).call + + # Report points removed (cleared from database) + Metrics::Archives::PointsArchived.new( + count: cleared_count, + operation: 'removed' + ).call rescue StandardError => e ExceptionReporter.call(e, "Failed to clear points for archive #{archive.id}") Rails.logger.error("✗ Failed to clear archive #{archive.id}: #{e.message}") + + # Report failed clear operation + Metrics::Archives::Operation.new( + operation: 'clear', + status: 'failure', + user_id: archive.user_id + ).call end def clear_points_in_batches(point_ids) diff --git a/app/services/points/raw_data/restorer.rb b/app/services/points/raw_data/restorer.rb index 004f7185..aba0da66 100644 --- a/app/services/points/raw_data/restorer.rb +++ b/app/services/points/raw_data/restorer.rb @@ -9,12 +9,38 @@ module Points raise "No archives found for user #{user_id}, #{year}-#{month}" if archives.empty? Rails.logger.info("Restoring #{archives.count} archives to database...") + total_points = archives.sum(:point_count) - Point.transaction do - archives.each { restore_archive_to_db(_1) } + begin + Point.transaction do + archives.each { restore_archive_to_db(_1) } + end + + Rails.logger.info("✓ Restored #{total_points} points") + + # Report successful restore operation + Metrics::Archives::Operation.new( + operation: 'restore', + status: 'success', + user_id: user_id, + points_count: total_points + ).call + + # Report points restored (removed from archived state) + Metrics::Archives::PointsArchived.new( + count: total_points, + operation: 'removed' + ).call + rescue StandardError => e + # Report failed restore operation + Metrics::Archives::Operation.new( + operation: 'restore', + status: 'failure', + user_id: user_id + ).call + + raise end - - Rails.logger.info("✓ Restored #{archives.sum(:point_count)} points") end def restore_to_memory(user_id, year, month) diff --git a/app/services/points/raw_data/verifier.rb b/app/services/points/raw_data/verifier.rb index 2da7dfc2..63474c93 100644 --- a/app/services/points/raw_data/verifier.rb +++ b/app/services/points/raw_data/verifier.rb @@ -40,6 +40,7 @@ module Points def verify_archive(archive) Rails.logger.info("Verifying archive #{archive.id} (#{archive.month_display}, chunk #{archive.chunk_number})...") + start_time = Time.current verification_result = perform_verification(archive) @@ -47,6 +48,16 @@ module Points archive.update!(verified_at: Time.current) @stats[:verified] += 1 Rails.logger.info("✓ Archive #{archive.id} verified successfully") + + # Report successful verification operation + Metrics::Archives::Operation.new( + operation: 'verify', + status: 'success', + user_id: archive.user_id + ).call + + # Report verification duration + report_verification_metric(start_time, 'success') else @stats[:failed] += 1 Rails.logger.error("✗ Archive #{archive.id} verification failed: #{verification_result[:error]}") @@ -54,11 +65,32 @@ module Points StandardError.new(verification_result[:error]), "Archive verification failed for archive #{archive.id}" ) + + # Report failed verification operation + Metrics::Archives::Operation.new( + operation: 'verify', + status: 'failure', + user_id: archive.user_id + ).call + + # Report verification duration with check name + check_name = extract_check_name_from_error(verification_result[:error]) + report_verification_metric(start_time, 'failure', check_name) end rescue StandardError => e @stats[:failed] += 1 ExceptionReporter.call(e, "Failed to verify archive #{archive.id}") Rails.logger.error("✗ Archive #{archive.id} verification error: #{e.message}") + + # Report failed verification operation + Metrics::Archives::Operation.new( + operation: 'verify', + status: 'failure', + user_id: archive.user_id + ).call + + # Report verification duration + report_verification_metric(start_time, 'failure', 'exception') end def perform_verification(archive) @@ -194,6 +226,39 @@ module Points def calculate_checksum(point_ids) Digest::SHA256.hexdigest(point_ids.sort.join(',')) end + + def report_verification_metric(start_time, status, check_name = nil) + duration = Time.current - start_time + + Metrics::Archives::Verification.new( + duration_seconds: duration, + status: status, + check_name: check_name + ).call + end + + def extract_check_name_from_error(error_message) + case error_message + when /File not attached/i + 'file_not_attached' + when /File download failed/i + 'download_failed' + when /File is empty/i + 'empty_file' + when /MD5 checksum mismatch/i + 'md5_checksum_mismatch' + when /Decompression\/parsing failed/i + 'decompression_failed' + when /Point count mismatch/i + 'count_mismatch' + when /Point IDs checksum mismatch/i + 'checksum_mismatch' + when /Raw data mismatch/i + 'raw_data_mismatch' + else + 'unknown' + end + end end end end diff --git a/spec/services/metrics/archives/compression_ratio_spec.rb b/spec/services/metrics/archives/compression_ratio_spec.rb new file mode 100644 index 00000000..7ddd7730 --- /dev/null +++ b/spec/services/metrics/archives/compression_ratio_spec.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'prometheus_exporter/client' + +RSpec.describe Metrics::Archives::CompressionRatio do + describe '#call' do + subject(:compression_ratio) do + described_class.new( + original_size: original_size, + compressed_size: compressed_size + ).call + end + + let(:original_size) { 10_000 } + let(:compressed_size) { 3_000 } + let(:expected_ratio) { 0.3 } + let(:prometheus_client) { instance_double(PrometheusExporter::Client) } + + before do + allow(PrometheusExporter::Client).to receive(:default).and_return(prometheus_client) + allow(prometheus_client).to receive(:send_json) + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(true) + end + + it 'sends compression ratio histogram metric to prometheus' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'histogram', + name: 'dawarich_archive_compression_ratio', + value: expected_ratio, + buckets: [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] + } + ) + + compression_ratio + end + + context 'when prometheus exporter is disabled' do + before do + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(false) + end + + it 'does not send metric' do + expect(prometheus_client).not_to receive(:send_json) + + compression_ratio + end + end + end +end diff --git a/spec/services/metrics/archives/count_mismatch_spec.rb b/spec/services/metrics/archives/count_mismatch_spec.rb new file mode 100644 index 00000000..c482228a --- /dev/null +++ b/spec/services/metrics/archives/count_mismatch_spec.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'prometheus_exporter/client' + +RSpec.describe Metrics::Archives::CountMismatch do + describe '#call' do + subject(:count_mismatch) do + described_class.new( + user_id: user_id, + year: year, + month: month, + expected: expected, + actual: actual + ).call + end + + let(:user_id) { 123 } + let(:year) { 2025 } + let(:month) { 1 } + let(:expected) { 100 } + let(:actual) { 95 } + let(:prometheus_client) { instance_double(PrometheusExporter::Client) } + + before do + allow(PrometheusExporter::Client).to receive(:default).and_return(prometheus_client) + allow(prometheus_client).to receive(:send_json) + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(true) + end + + it 'sends count mismatch counter metric' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'counter', + name: 'dawarich_archive_count_mismatches_total', + value: 1, + labels: { + year: year.to_s, + month: month.to_s + } + } + ) + + count_mismatch + end + + it 'sends count difference gauge metric' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'gauge', + name: 'dawarich_archive_count_difference', + value: 5, + labels: { + user_id: user_id.to_s + } + } + ) + + count_mismatch + end + + context 'when prometheus exporter is disabled' do + before do + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(false) + end + + it 'does not send metrics' do + expect(prometheus_client).not_to receive(:send_json) + + count_mismatch + end + end + end +end diff --git a/spec/services/metrics/archives/operation_spec.rb b/spec/services/metrics/archives/operation_spec.rb new file mode 100644 index 00000000..3e227d1a --- /dev/null +++ b/spec/services/metrics/archives/operation_spec.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'prometheus_exporter/client' + +RSpec.describe Metrics::Archives::Operation do + describe '#call' do + subject(:operation) { described_class.new(operation: operation_type, status: status, user_id: user_id).call } + + let(:user_id) { 123 } + let(:operation_type) { 'archive' } + let(:status) { 'success' } + let(:prometheus_client) { instance_double(PrometheusExporter::Client) } + + before do + allow(PrometheusExporter::Client).to receive(:default).and_return(prometheus_client) + allow(prometheus_client).to receive(:send_json) + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(true) + end + + it 'sends operation metric to prometheus' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'counter', + name: 'dawarich_archive_operations_total', + value: 1, + labels: { + operation: operation_type, + status: status + } + } + ) + + operation + end + + context 'when prometheus exporter is disabled' do + before do + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(false) + end + + it 'does not send metric' do + expect(prometheus_client).not_to receive(:send_json) + + operation + end + end + + context 'when operation fails' do + let(:status) { 'failure' } + + it 'sends failure metric' do + expect(prometheus_client).to receive(:send_json).with( + hash_including( + labels: hash_including(status: 'failure') + ) + ) + + operation + end + end + end +end diff --git a/spec/services/metrics/archives/points_archived_spec.rb b/spec/services/metrics/archives/points_archived_spec.rb new file mode 100644 index 00000000..4171b3d2 --- /dev/null +++ b/spec/services/metrics/archives/points_archived_spec.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'prometheus_exporter/client' + +RSpec.describe Metrics::Archives::PointsArchived do + describe '#call' do + subject(:points_archived) { described_class.new(count: count, operation: operation).call } + + let(:count) { 250 } + let(:operation) { 'added' } + let(:prometheus_client) { instance_double(PrometheusExporter::Client) } + + before do + allow(PrometheusExporter::Client).to receive(:default).and_return(prometheus_client) + allow(prometheus_client).to receive(:send_json) + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(true) + end + + it 'sends points archived metric to prometheus' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'counter', + name: 'dawarich_archive_points_total', + value: count, + labels: { + operation: operation + } + } + ) + + points_archived + end + + context 'when operation is removed' do + let(:operation) { 'removed' } + + it 'sends removed operation metric' do + expect(prometheus_client).to receive(:send_json).with( + hash_including( + labels: { operation: 'removed' } + ) + ) + + points_archived + end + end + + context 'when prometheus exporter is disabled' do + before do + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(false) + end + + it 'does not send metric' do + expect(prometheus_client).not_to receive(:send_json) + + points_archived + end + end + end +end diff --git a/spec/services/metrics/archives/size_spec.rb b/spec/services/metrics/archives/size_spec.rb new file mode 100644 index 00000000..60a8b320 --- /dev/null +++ b/spec/services/metrics/archives/size_spec.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'prometheus_exporter/client' + +RSpec.describe Metrics::Archives::Size do + describe '#call' do + subject(:size) { described_class.new(size_bytes: size_bytes).call } + + let(:size_bytes) { 5_000_000 } + let(:prometheus_client) { instance_double(PrometheusExporter::Client) } + + before do + allow(PrometheusExporter::Client).to receive(:default).and_return(prometheus_client) + allow(prometheus_client).to receive(:send_json) + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(true) + end + + it 'sends archive size histogram metric to prometheus' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'histogram', + name: 'dawarich_archive_size_bytes', + value: size_bytes, + buckets: [ + 1_000_000, + 10_000_000, + 50_000_000, + 100_000_000, + 500_000_000, + 1_000_000_000 + ] + } + ) + + size + end + + context 'when prometheus exporter is disabled' do + before do + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(false) + end + + it 'does not send metric' do + expect(prometheus_client).not_to receive(:send_json) + + size + end + end + end +end diff --git a/spec/services/metrics/archives/verification_spec.rb b/spec/services/metrics/archives/verification_spec.rb new file mode 100644 index 00000000..d3323e68 --- /dev/null +++ b/spec/services/metrics/archives/verification_spec.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'prometheus_exporter/client' + +RSpec.describe Metrics::Archives::Verification do + describe '#call' do + subject(:verification) do + described_class.new( + duration_seconds: duration_seconds, + status: status, + check_name: check_name + ).call + end + + let(:duration_seconds) { 2.5 } + let(:status) { 'success' } + let(:check_name) { nil } + let(:prometheus_client) { instance_double(PrometheusExporter::Client) } + + before do + allow(PrometheusExporter::Client).to receive(:default).and_return(prometheus_client) + allow(prometheus_client).to receive(:send_json) + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(true) + end + + it 'sends verification duration histogram metric' do + expect(prometheus_client).to receive(:send_json).with( + { + type: 'histogram', + name: 'dawarich_archive_verification_duration_seconds', + value: duration_seconds, + labels: { + status: status + }, + buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60] + } + ) + + verification + end + + context 'when verification fails with check name' do + let(:status) { 'failure' } + let(:check_name) { 'count_mismatch' } + + it 'sends verification failure counter metric' do + expect(prometheus_client).to receive(:send_json).with( + hash_including( + type: 'counter', + name: 'dawarich_archive_verification_failures_total', + value: 1, + labels: { + check: check_name + } + ) + ) + + verification + end + end + + context 'when prometheus exporter is disabled' do + before do + allow(DawarichSettings).to receive(:prometheus_exporter_enabled?).and_return(false) + end + + it 'does not send metrics' do + expect(prometheus_client).not_to receive(:send_json) + + verification + end + end + end +end