mirror of
https://github.com/Freika/dawarich.git
synced 2026-01-11 01:31:39 -05:00
Add verification step to raw data archival process (#2028)
* Add verification step to raw data archival process * Add actual verification of raw data archives after creation, and only clear raw_data for verified archives. * Fix failing specs
This commit is contained in:
parent
353837e27f
commit
88f5e2a6ea
10 changed files with 757 additions and 79 deletions
|
|
@ -120,8 +120,7 @@ module Points
|
||||||
Point.transaction do
|
Point.transaction do
|
||||||
Point.where(id: point_ids).update_all(
|
Point.where(id: point_ids).update_all(
|
||||||
raw_data_archived: true,
|
raw_data_archived: true,
|
||||||
raw_data_archive_id: archive_id,
|
raw_data_archive_id: archive_id
|
||||||
raw_data: {}
|
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
96
app/services/points/raw_data/clearer.rb
Normal file
96
app/services/points/raw_data/clearer.rb
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
module Points
|
||||||
|
module RawData
|
||||||
|
class Clearer
|
||||||
|
BATCH_SIZE = 10_000
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@stats = { cleared: 0, skipped: 0 }
|
||||||
|
end
|
||||||
|
|
||||||
|
def call
|
||||||
|
Rails.logger.info('Starting raw_data clearing for verified archives...')
|
||||||
|
|
||||||
|
verified_archives.find_each do |archive|
|
||||||
|
clear_archive_points(archive)
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.info("Clearing complete: #{@stats}")
|
||||||
|
@stats
|
||||||
|
end
|
||||||
|
|
||||||
|
def clear_specific_archive(archive_id)
|
||||||
|
archive = Points::RawDataArchive.find(archive_id)
|
||||||
|
|
||||||
|
unless archive.verified_at.present?
|
||||||
|
Rails.logger.warn("Archive #{archive_id} not verified, skipping clear")
|
||||||
|
return { cleared: 0, skipped: 0 }
|
||||||
|
end
|
||||||
|
|
||||||
|
clear_archive_points(archive)
|
||||||
|
end
|
||||||
|
|
||||||
|
def clear_month(user_id, year, month)
|
||||||
|
archives = Points::RawDataArchive.for_month(user_id, year, month)
|
||||||
|
.where.not(verified_at: nil)
|
||||||
|
|
||||||
|
Rails.logger.info("Clearing #{archives.count} verified archives for #{year}-#{format('%02d', month)}...")
|
||||||
|
|
||||||
|
archives.each { |archive| clear_archive_points(archive) }
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def verified_archives
|
||||||
|
# Only archives that are verified but have points with non-empty raw_data
|
||||||
|
Points::RawDataArchive
|
||||||
|
.where.not(verified_at: nil)
|
||||||
|
.where(id: points_needing_clearing.select(:raw_data_archive_id).distinct)
|
||||||
|
end
|
||||||
|
|
||||||
|
def points_needing_clearing
|
||||||
|
Point.where(raw_data_archived: true)
|
||||||
|
.where.not(raw_data: {})
|
||||||
|
.where.not(raw_data_archive_id: nil)
|
||||||
|
end
|
||||||
|
|
||||||
|
def clear_archive_points(archive)
|
||||||
|
Rails.logger.info(
|
||||||
|
"Clearing points for archive #{archive.id} " \
|
||||||
|
"(#{archive.month_display}, chunk #{archive.chunk_number})..."
|
||||||
|
)
|
||||||
|
|
||||||
|
point_ids = Point.where(raw_data_archive_id: archive.id)
|
||||||
|
.where(raw_data_archived: true)
|
||||||
|
.where.not(raw_data: {})
|
||||||
|
.pluck(:id)
|
||||||
|
|
||||||
|
if point_ids.empty?
|
||||||
|
Rails.logger.info("No points to clear for archive #{archive.id}")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
cleared_count = clear_points_in_batches(point_ids)
|
||||||
|
@stats[:cleared] += cleared_count
|
||||||
|
Rails.logger.info("✓ Cleared #{cleared_count} points for archive #{archive.id}")
|
||||||
|
rescue StandardError => e
|
||||||
|
ExceptionReporter.call(e, "Failed to clear points for archive #{archive.id}")
|
||||||
|
Rails.logger.error("✗ Failed to clear archive #{archive.id}: #{e.message}")
|
||||||
|
end
|
||||||
|
|
||||||
|
def clear_points_in_batches(point_ids)
|
||||||
|
total_cleared = 0
|
||||||
|
|
||||||
|
point_ids.each_slice(BATCH_SIZE) do |batch|
|
||||||
|
Point.transaction do
|
||||||
|
Point.where(id: batch).update_all(raw_data: {})
|
||||||
|
total_cleared += batch.size
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
total_cleared
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
194
app/services/points/raw_data/verifier.rb
Normal file
194
app/services/points/raw_data/verifier.rb
Normal file
|
|
@ -0,0 +1,194 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
module Points
|
||||||
|
module RawData
|
||||||
|
class Verifier
|
||||||
|
def initialize
|
||||||
|
@stats = { verified: 0, failed: 0 }
|
||||||
|
end
|
||||||
|
|
||||||
|
def call
|
||||||
|
Rails.logger.info('Starting raw_data archive verification...')
|
||||||
|
|
||||||
|
unverified_archives.find_each do |archive|
|
||||||
|
verify_archive(archive)
|
||||||
|
end
|
||||||
|
|
||||||
|
Rails.logger.info("Verification complete: #{@stats}")
|
||||||
|
@stats
|
||||||
|
end
|
||||||
|
|
||||||
|
def verify_specific_archive(archive_id)
|
||||||
|
archive = Points::RawDataArchive.find(archive_id)
|
||||||
|
verify_archive(archive)
|
||||||
|
end
|
||||||
|
|
||||||
|
def verify_month(user_id, year, month)
|
||||||
|
archives = Points::RawDataArchive.for_month(user_id, year, month)
|
||||||
|
.where(verified_at: nil)
|
||||||
|
|
||||||
|
Rails.logger.info("Verifying #{archives.count} archives for #{year}-#{format('%02d', month)}...")
|
||||||
|
|
||||||
|
archives.each { |archive| verify_archive(archive) }
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def unverified_archives
|
||||||
|
Points::RawDataArchive.where(verified_at: nil)
|
||||||
|
end
|
||||||
|
|
||||||
|
def verify_archive(archive)
|
||||||
|
Rails.logger.info("Verifying archive #{archive.id} (#{archive.month_display}, chunk #{archive.chunk_number})...")
|
||||||
|
|
||||||
|
verification_result = perform_verification(archive)
|
||||||
|
|
||||||
|
if verification_result[:success]
|
||||||
|
archive.update!(verified_at: Time.current)
|
||||||
|
@stats[:verified] += 1
|
||||||
|
Rails.logger.info("✓ Archive #{archive.id} verified successfully")
|
||||||
|
else
|
||||||
|
@stats[:failed] += 1
|
||||||
|
Rails.logger.error("✗ Archive #{archive.id} verification failed: #{verification_result[:error]}")
|
||||||
|
ExceptionReporter.call(
|
||||||
|
StandardError.new(verification_result[:error]),
|
||||||
|
"Archive verification failed for archive #{archive.id}"
|
||||||
|
)
|
||||||
|
end
|
||||||
|
rescue StandardError => e
|
||||||
|
@stats[:failed] += 1
|
||||||
|
ExceptionReporter.call(e, "Failed to verify archive #{archive.id}")
|
||||||
|
Rails.logger.error("✗ Archive #{archive.id} verification error: #{e.message}")
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform_verification(archive)
|
||||||
|
# 1. Verify file exists and is attached
|
||||||
|
unless archive.file.attached?
|
||||||
|
return { success: false, error: 'File not attached' }
|
||||||
|
end
|
||||||
|
|
||||||
|
# 2. Verify file can be downloaded
|
||||||
|
begin
|
||||||
|
compressed_content = archive.file.blob.download
|
||||||
|
rescue StandardError => e
|
||||||
|
return { success: false, error: "File download failed: #{e.message}" }
|
||||||
|
end
|
||||||
|
|
||||||
|
# 3. Verify file size is reasonable
|
||||||
|
if compressed_content.bytesize.zero?
|
||||||
|
return { success: false, error: 'File is empty' }
|
||||||
|
end
|
||||||
|
|
||||||
|
# 4. Verify MD5 checksum (if blob has checksum)
|
||||||
|
if archive.file.blob.checksum.present?
|
||||||
|
calculated_checksum = Digest::MD5.base64digest(compressed_content)
|
||||||
|
if calculated_checksum != archive.file.blob.checksum
|
||||||
|
return { success: false, error: 'MD5 checksum mismatch' }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# 5. Verify file can be decompressed and is valid JSONL, extract data
|
||||||
|
begin
|
||||||
|
archived_data = decompress_and_extract_data(compressed_content)
|
||||||
|
rescue StandardError => e
|
||||||
|
return { success: false, error: "Decompression/parsing failed: #{e.message}" }
|
||||||
|
end
|
||||||
|
|
||||||
|
point_ids = archived_data.keys
|
||||||
|
|
||||||
|
# 6. Verify point count matches
|
||||||
|
if point_ids.count != archive.point_count
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: "Point count mismatch: expected #{archive.point_count}, found #{point_ids.count}"
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
# 7. Verify point IDs checksum matches
|
||||||
|
calculated_checksum = calculate_checksum(point_ids)
|
||||||
|
if calculated_checksum != archive.point_ids_checksum
|
||||||
|
return { success: false, error: 'Point IDs checksum mismatch' }
|
||||||
|
end
|
||||||
|
|
||||||
|
# 8. Verify all points still exist in database
|
||||||
|
existing_count = Point.where(id: point_ids).count
|
||||||
|
if existing_count != point_ids.count
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: "Missing points in database: expected #{point_ids.count}, found #{existing_count}"
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
# 9. Verify archived raw_data matches current database raw_data
|
||||||
|
verification_result = verify_raw_data_matches(archived_data)
|
||||||
|
return verification_result unless verification_result[:success]
|
||||||
|
|
||||||
|
{ success: true }
|
||||||
|
end
|
||||||
|
|
||||||
|
def decompress_and_extract_data(compressed_content)
|
||||||
|
io = StringIO.new(compressed_content)
|
||||||
|
gz = Zlib::GzipReader.new(io)
|
||||||
|
archived_data = {}
|
||||||
|
|
||||||
|
gz.each_line do |line|
|
||||||
|
data = JSON.parse(line)
|
||||||
|
archived_data[data['id']] = data['raw_data']
|
||||||
|
end
|
||||||
|
|
||||||
|
gz.close
|
||||||
|
archived_data
|
||||||
|
end
|
||||||
|
|
||||||
|
def verify_raw_data_matches(archived_data)
|
||||||
|
# For small archives, verify all points. For large archives, sample up to 100 points.
|
||||||
|
# Always verify all if 100 or fewer points for maximum accuracy
|
||||||
|
if archived_data.size <= 100
|
||||||
|
point_ids_to_check = archived_data.keys
|
||||||
|
else
|
||||||
|
point_ids_to_check = archived_data.keys.sample(100)
|
||||||
|
end
|
||||||
|
|
||||||
|
mismatches = []
|
||||||
|
found_points = 0
|
||||||
|
|
||||||
|
Point.where(id: point_ids_to_check).find_each do |point|
|
||||||
|
found_points += 1
|
||||||
|
archived_raw_data = archived_data[point.id]
|
||||||
|
current_raw_data = point.raw_data
|
||||||
|
|
||||||
|
# Compare the raw_data (both should be hashes)
|
||||||
|
if archived_raw_data != current_raw_data
|
||||||
|
mismatches << {
|
||||||
|
point_id: point.id,
|
||||||
|
archived: archived_raw_data,
|
||||||
|
current: current_raw_data
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Check if we found all the points we were looking for
|
||||||
|
if found_points != point_ids_to_check.size
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: "Missing points during data verification: expected #{point_ids_to_check.size}, found #{found_points}"
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
if mismatches.any?
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: "Raw data mismatch detected in #{mismatches.count} point(s). " \
|
||||||
|
"First mismatch: Point #{mismatches.first[:point_id]}"
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
{ success: true }
|
||||||
|
end
|
||||||
|
|
||||||
|
def calculate_checksum(point_ids)
|
||||||
|
Digest::SHA256.hexdigest(point_ids.sort.join(','))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -50,7 +50,7 @@ nightly_family_invitations_cleanup_job:
|
||||||
class: "Family::Invitations::CleanupJob"
|
class: "Family::Invitations::CleanupJob"
|
||||||
queue: family
|
queue: family
|
||||||
|
|
||||||
raw_data_archival_job:
|
# raw_data_archival_job:
|
||||||
cron: "0 2 1 * *" # Monthly on the 1st at 2 AM
|
# cron: "0 2 1 * *" # Monthly on the 1st at 2 AM
|
||||||
class: "Points::RawData::ArchiveJob"
|
# class: "Points::RawData::ArchiveJob"
|
||||||
queue: archival
|
# queue: archival
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
class AddVerifiedAtToPointsRawDataArchives < ActiveRecord::Migration[8.0]
|
||||||
|
def change
|
||||||
|
add_column :points_raw_data_archives, :verified_at, :datetime
|
||||||
|
end
|
||||||
|
end
|
||||||
3
db/schema.rb
generated
3
db/schema.rb
generated
|
|
@ -10,7 +10,7 @@
|
||||||
#
|
#
|
||||||
# It's strongly recommended that you check this file into your version control system.
|
# It's strongly recommended that you check this file into your version control system.
|
||||||
|
|
||||||
ActiveRecord::Schema[8.0].define(version: 2025_12_08_210410) do
|
ActiveRecord::Schema[8.0].define(version: 2025_12_10_193532) do
|
||||||
# These are extensions that must be enabled in order to support this database
|
# These are extensions that must be enabled in order to support this database
|
||||||
enable_extension "pg_catalog.plpgsql"
|
enable_extension "pg_catalog.plpgsql"
|
||||||
enable_extension "postgis"
|
enable_extension "postgis"
|
||||||
|
|
@ -264,6 +264,7 @@ ActiveRecord::Schema[8.0].define(version: 2025_12_08_210410) do
|
||||||
t.datetime "archived_at", null: false
|
t.datetime "archived_at", null: false
|
||||||
t.datetime "created_at", null: false
|
t.datetime "created_at", null: false
|
||||||
t.datetime "updated_at", null: false
|
t.datetime "updated_at", null: false
|
||||||
|
t.datetime "verified_at"
|
||||||
t.index ["archived_at"], name: "index_points_raw_data_archives_on_archived_at"
|
t.index ["archived_at"], name: "index_points_raw_data_archives_on_archived_at"
|
||||||
t.index ["user_id", "year", "month"], name: "index_points_raw_data_archives_on_user_id_and_year_and_month"
|
t.index ["user_id", "year", "month"], name: "index_points_raw_data_archives_on_user_id_and_year_and_month"
|
||||||
t.index ["user_id"], name: "index_points_raw_data_archives_on_user_id"
|
t.index ["user_id"], name: "index_points_raw_data_archives_on_user_id"
|
||||||
|
|
|
||||||
|
|
@ -95,12 +95,20 @@ namespace :points do
|
||||||
puts ''
|
puts ''
|
||||||
|
|
||||||
total_archives = Points::RawDataArchive.count
|
total_archives = Points::RawDataArchive.count
|
||||||
|
verified_archives = Points::RawDataArchive.where.not(verified_at: nil).count
|
||||||
|
unverified_archives = total_archives - verified_archives
|
||||||
|
|
||||||
total_points = Point.count
|
total_points = Point.count
|
||||||
archived_points = Point.where(raw_data_archived: true).count
|
archived_points = Point.where(raw_data_archived: true).count
|
||||||
|
cleared_points = Point.where(raw_data_archived: true, raw_data: {}).count
|
||||||
|
archived_not_cleared = archived_points - cleared_points
|
||||||
|
|
||||||
percentage = total_points.positive? ? (archived_points.to_f / total_points * 100).round(2) : 0
|
percentage = total_points.positive? ? (archived_points.to_f / total_points * 100).round(2) : 0
|
||||||
|
|
||||||
puts "Archives: #{total_archives}"
|
puts "Archives: #{total_archives} (#{verified_archives} verified, #{unverified_archives} unverified)"
|
||||||
puts "Points archived: #{archived_points} / #{total_points} (#{percentage}%)"
|
puts "Points archived: #{archived_points} / #{total_points} (#{percentage}%)"
|
||||||
|
puts "Points cleared: #{cleared_points}"
|
||||||
|
puts "Archived but not cleared: #{archived_not_cleared}"
|
||||||
puts ''
|
puts ''
|
||||||
|
|
||||||
# Storage size via ActiveStorage
|
# Storage size via ActiveStorage
|
||||||
|
|
@ -133,87 +141,88 @@ namespace :points do
|
||||||
puts ''
|
puts ''
|
||||||
end
|
end
|
||||||
|
|
||||||
desc 'Verify archive integrity for a month'
|
desc 'Verify archive integrity (all unverified archives, or specific month with args)'
|
||||||
task :verify, [:user_id, :year, :month] => :environment do |_t, args|
|
task :verify, [:user_id, :year, :month] => :environment do |_t, args|
|
||||||
validate_args!(args)
|
verifier = Points::RawData::Verifier.new
|
||||||
|
|
||||||
user_id = args[:user_id].to_i
|
if args[:user_id] && args[:year] && args[:month]
|
||||||
year = args[:year].to_i
|
# Verify specific month
|
||||||
month = args[:month].to_i
|
user_id = args[:user_id].to_i
|
||||||
|
year = args[:year].to_i
|
||||||
|
month = args[:month].to_i
|
||||||
|
|
||||||
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
puts ' Verifying Archives'
|
puts ' Verifying Archives'
|
||||||
puts " User: #{user_id} | Month: #{year}-#{format('%02d', month)}"
|
puts " User: #{user_id} | Month: #{year}-#{format('%02d', month)}"
|
||||||
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
puts ''
|
puts ''
|
||||||
|
|
||||||
archives = Points::RawDataArchive.for_month(user_id, year, month)
|
verifier.verify_month(user_id, year, month)
|
||||||
|
|
||||||
if archives.empty?
|
|
||||||
puts 'No archives found.'
|
|
||||||
exit
|
|
||||||
end
|
|
||||||
|
|
||||||
all_ok = true
|
|
||||||
|
|
||||||
archives.each do |archive|
|
|
||||||
print "Chunk #{archive.chunk_number}: "
|
|
||||||
|
|
||||||
# Check file attached
|
|
||||||
unless archive.file.attached?
|
|
||||||
puts '✗ ERROR - File not attached!'
|
|
||||||
all_ok = false
|
|
||||||
next
|
|
||||||
end
|
|
||||||
|
|
||||||
# Download and count
|
|
||||||
begin
|
|
||||||
compressed = archive.file.blob.download
|
|
||||||
io = StringIO.new(compressed)
|
|
||||||
gz = Zlib::GzipReader.new(io)
|
|
||||||
|
|
||||||
actual_count = 0
|
|
||||||
gz.each_line { actual_count += 1 }
|
|
||||||
gz.close
|
|
||||||
|
|
||||||
if actual_count == archive.point_count
|
|
||||||
puts "✓ OK (#{actual_count} points, #{archive.size_mb} MB)"
|
|
||||||
else
|
|
||||||
puts "✗ MISMATCH - Expected #{archive.point_count}, found #{actual_count}"
|
|
||||||
all_ok = false
|
|
||||||
end
|
|
||||||
rescue StandardError => e
|
|
||||||
puts "✗ ERROR - #{e.message}"
|
|
||||||
all_ok = false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
puts ''
|
|
||||||
if all_ok
|
|
||||||
puts '✓ All archives verified successfully!'
|
|
||||||
else
|
else
|
||||||
puts '✗ Some archives have issues. Please investigate.'
|
# Verify all unverified archives
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ' Verifying All Unverified Archives'
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
stats = verifier.call
|
||||||
|
|
||||||
|
puts ''
|
||||||
|
puts "Verified: #{stats[:verified]}"
|
||||||
|
puts "Failed: #{stats[:failed]}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
puts ''
|
||||||
|
puts '✓ Verification complete!'
|
||||||
end
|
end
|
||||||
|
|
||||||
desc 'Run initial archival for old data (safe to re-run)'
|
desc 'Clear raw_data for verified archives (all verified, or specific month with args)'
|
||||||
task initial_archive: :environment do
|
task :clear_verified, [:user_id, :year, :month] => :environment do |_t, args|
|
||||||
|
clearer = Points::RawData::Clearer.new
|
||||||
|
|
||||||
|
if args[:user_id] && args[:year] && args[:month]
|
||||||
|
# Clear specific month
|
||||||
|
user_id = args[:user_id].to_i
|
||||||
|
year = args[:year].to_i
|
||||||
|
month = args[:month].to_i
|
||||||
|
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ' Clearing Verified Archives'
|
||||||
|
puts " User: #{user_id} | Month: #{year}-#{format('%02d', month)}"
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
clearer.clear_month(user_id, year, month)
|
||||||
|
else
|
||||||
|
# Clear all verified archives
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ' Clearing All Verified Archives'
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
stats = clearer.call
|
||||||
|
|
||||||
|
puts ''
|
||||||
|
puts "Points cleared: #{stats[:cleared]}"
|
||||||
|
end
|
||||||
|
|
||||||
|
puts ''
|
||||||
|
puts '✓ Clearing complete!'
|
||||||
|
puts ''
|
||||||
|
puts 'Run VACUUM ANALYZE points; to reclaim space and update statistics.'
|
||||||
|
end
|
||||||
|
|
||||||
|
desc 'Archive raw_data for old data (2+ months old, does NOT clear yet)'
|
||||||
|
task archive: :environment do
|
||||||
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
puts ' Initial Archival (2+ months old data)'
|
puts ' Archiving Raw Data (2+ months old data)'
|
||||||
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
puts ''
|
puts ''
|
||||||
puts 'This will archive points.raw_data for months 2+ months old.'
|
puts 'This will archive points.raw_data for months 2+ months old.'
|
||||||
|
puts 'Raw data will NOT be cleared yet - use verify and clear_verified tasks.'
|
||||||
puts 'This is safe to run multiple times (idempotent).'
|
puts 'This is safe to run multiple times (idempotent).'
|
||||||
puts ''
|
puts ''
|
||||||
print 'Continue? (y/N): '
|
|
||||||
|
|
||||||
response = $stdin.gets.chomp.downcase
|
|
||||||
unless response == 'y'
|
|
||||||
puts 'Cancelled.'
|
|
||||||
exit
|
|
||||||
end
|
|
||||||
|
|
||||||
puts ''
|
|
||||||
stats = Points::RawData::Archiver.new.call
|
stats = Points::RawData::Archiver.new.call
|
||||||
|
|
||||||
puts ''
|
puts ''
|
||||||
|
|
@ -229,10 +238,53 @@ namespace :points do
|
||||||
return unless stats[:archived].positive?
|
return unless stats[:archived].positive?
|
||||||
|
|
||||||
puts 'Next steps:'
|
puts 'Next steps:'
|
||||||
puts '1. Verify a sample: rake points:raw_data:verify[user_id,year,month]'
|
puts '1. Verify archives: rake points:raw_data:verify'
|
||||||
puts '2. Check stats: rake points:raw_data:status'
|
puts '2. Clear verified data: rake points:raw_data:clear_verified'
|
||||||
puts '3. (Optional) Reclaim space: VACUUM FULL points; (during maintenance)'
|
puts '3. Check stats: rake points:raw_data:status'
|
||||||
end
|
end
|
||||||
|
|
||||||
|
desc 'Full workflow: archive + verify + clear (for automated use)'
|
||||||
|
task archive_full: :environment do
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ' Full Archive Workflow'
|
||||||
|
puts ' (Archive → Verify → Clear)'
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
# Step 1: Archive
|
||||||
|
puts '▸ Step 1/3: Archiving...'
|
||||||
|
archiver_stats = Points::RawData::Archiver.new.call
|
||||||
|
puts " ✓ Archived #{archiver_stats[:archived]} points"
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
# Step 2: Verify
|
||||||
|
puts '▸ Step 2/3: Verifying...'
|
||||||
|
verifier_stats = Points::RawData::Verifier.new.call
|
||||||
|
puts " ✓ Verified #{verifier_stats[:verified]} archives"
|
||||||
|
if verifier_stats[:failed].positive?
|
||||||
|
puts " ✗ Failed to verify #{verifier_stats[:failed]} archives"
|
||||||
|
puts ''
|
||||||
|
puts '⚠ Some archives failed verification. Data NOT cleared for safety.'
|
||||||
|
puts 'Please investigate failed archives before running clear_verified.'
|
||||||
|
exit 1
|
||||||
|
end
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
# Step 3: Clear
|
||||||
|
puts '▸ Step 3/3: Clearing verified data...'
|
||||||
|
clearer_stats = Points::RawData::Clearer.new.call
|
||||||
|
puts " ✓ Cleared #{clearer_stats[:cleared]} points"
|
||||||
|
puts ''
|
||||||
|
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ' ✓ Full Archive Workflow Complete!'
|
||||||
|
puts '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━'
|
||||||
|
puts ''
|
||||||
|
puts 'Run VACUUM ANALYZE points; to reclaim space.'
|
||||||
|
end
|
||||||
|
|
||||||
|
# Alias for backward compatibility
|
||||||
|
task initial_archive: :archive
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -48,10 +48,10 @@ RSpec.describe Points::RawData::Archiver do
|
||||||
expect(Point.where(raw_data_archived: true).count).to eq(5)
|
expect(Point.where(raw_data_archived: true).count).to eq(5)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'nullifies raw_data column' do
|
it 'keeps raw_data intact (does not clear yet)' do
|
||||||
archiver.call
|
archiver.call
|
||||||
Point.where(user: user).find_each do |point|
|
Point.where(user: user).find_each do |point|
|
||||||
expect(point.raw_data).to eq({})
|
expect(point.raw_data).to eq({ 'lon' => 13.4, 'lat' => 52.5 })
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
165
spec/services/points/raw_data/clearer_spec.rb
Normal file
165
spec/services/points/raw_data/clearer_spec.rb
Normal file
|
|
@ -0,0 +1,165 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe Points::RawData::Clearer do
|
||||||
|
let(:user) { create(:user) }
|
||||||
|
let(:clearer) { described_class.new }
|
||||||
|
|
||||||
|
before do
|
||||||
|
allow(PointsChannel).to receive(:broadcast_to)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#clear_specific_archive' do
|
||||||
|
let(:test_date) { 3.months.ago.beginning_of_month.utc }
|
||||||
|
let!(:points) do
|
||||||
|
create_list(:point, 5, user: user,
|
||||||
|
timestamp: test_date.to_i,
|
||||||
|
raw_data: { lon: 13.4, lat: 52.5 })
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:archive) do
|
||||||
|
# Create and verify archive
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
|
||||||
|
archive = Points::RawDataArchive.last
|
||||||
|
verifier = Points::RawData::Verifier.new
|
||||||
|
verifier.verify_specific_archive(archive.id)
|
||||||
|
|
||||||
|
archive.reload
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'clears raw_data for verified archive' do
|
||||||
|
expect(Point.where(user: user).pluck(:raw_data)).to all(eq({ 'lon' => 13.4, 'lat' => 52.5 }))
|
||||||
|
|
||||||
|
clearer.clear_specific_archive(archive.id)
|
||||||
|
|
||||||
|
expect(Point.where(user: user).pluck(:raw_data)).to all(eq({}))
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'does not clear unverified archive' do
|
||||||
|
# Create unverified archive
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
mid_month = test_date + 15.days
|
||||||
|
create_list(:point, 3, user: user,
|
||||||
|
timestamp: mid_month.to_i,
|
||||||
|
raw_data: { lon: 14.0, lat: 53.0 })
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
|
||||||
|
unverified_archive = Points::RawDataArchive.where(verified_at: nil).last
|
||||||
|
|
||||||
|
result = clearer.clear_specific_archive(unverified_archive.id)
|
||||||
|
|
||||||
|
expect(result[:cleared]).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'is idempotent (safe to run multiple times)' do
|
||||||
|
clearer.clear_specific_archive(archive.id)
|
||||||
|
first_result = Point.where(user: user).pluck(:raw_data)
|
||||||
|
|
||||||
|
clearer.clear_specific_archive(archive.id)
|
||||||
|
second_result = Point.where(user: user).pluck(:raw_data)
|
||||||
|
|
||||||
|
expect(first_result).to eq(second_result)
|
||||||
|
expect(first_result).to all(eq({}))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#clear_month' do
|
||||||
|
let(:test_date) { 3.months.ago.beginning_of_month.utc }
|
||||||
|
|
||||||
|
before do
|
||||||
|
# Create points and archive
|
||||||
|
create_list(:point, 5, user: user,
|
||||||
|
timestamp: test_date.to_i,
|
||||||
|
raw_data: { lon: 13.4, lat: 52.5 })
|
||||||
|
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
|
||||||
|
# Verify archive
|
||||||
|
verifier = Points::RawData::Verifier.new
|
||||||
|
verifier.verify_month(user.id, test_date.year, test_date.month)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'clears all verified archives for a month' do
|
||||||
|
expect(Point.where(user: user, raw_data: {}).count).to eq(0)
|
||||||
|
|
||||||
|
clearer.clear_month(user.id, test_date.year, test_date.month)
|
||||||
|
|
||||||
|
expect(Point.where(user: user, raw_data: {}).count).to eq(5)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#call' do
|
||||||
|
let(:test_date) { 3.months.ago.beginning_of_month.utc }
|
||||||
|
|
||||||
|
before do
|
||||||
|
# Create points and archive
|
||||||
|
create_list(:point, 5, user: user,
|
||||||
|
timestamp: test_date.to_i,
|
||||||
|
raw_data: { lon: 13.4, lat: 52.5 })
|
||||||
|
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
|
||||||
|
# Verify archive
|
||||||
|
verifier = Points::RawData::Verifier.new
|
||||||
|
verifier.verify_month(user.id, test_date.year, test_date.month)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'clears all verified archives' do
|
||||||
|
expect(Point.where(raw_data: {}).count).to eq(0)
|
||||||
|
|
||||||
|
result = clearer.call
|
||||||
|
|
||||||
|
expect(result[:cleared]).to eq(5)
|
||||||
|
expect(Point.where(raw_data: {}).count).to eq(5)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'skips unverified archives' do
|
||||||
|
# Create another month without verifying
|
||||||
|
new_date = 4.months.ago.beginning_of_month.utc
|
||||||
|
create_list(:point, 3, user: user,
|
||||||
|
timestamp: new_date.to_i,
|
||||||
|
raw_data: { lon: 14.0, lat: 53.0 })
|
||||||
|
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, new_date.year, new_date.month)
|
||||||
|
|
||||||
|
result = clearer.call
|
||||||
|
|
||||||
|
# Should only clear the verified month (5 points)
|
||||||
|
expect(result[:cleared]).to eq(5)
|
||||||
|
|
||||||
|
# Unverified month should still have raw_data
|
||||||
|
unverified_points = Point.where(user: user)
|
||||||
|
.where("timestamp >= ? AND timestamp < ?",
|
||||||
|
new_date.to_i,
|
||||||
|
(new_date + 1.month).to_i)
|
||||||
|
expect(unverified_points.pluck(:raw_data)).to all(eq({ 'lon' => 14.0, 'lat' => 53.0 }))
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'is idempotent (safe to run multiple times)' do
|
||||||
|
first_result = clearer.call
|
||||||
|
|
||||||
|
# Use a new instance for second call
|
||||||
|
new_clearer = Points::RawData::Clearer.new
|
||||||
|
second_result = new_clearer.call
|
||||||
|
|
||||||
|
expect(first_result[:cleared]).to eq(5)
|
||||||
|
expect(second_result[:cleared]).to eq(0) # Already cleared
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'handles large batches' do
|
||||||
|
# Stub batch size to test batching logic
|
||||||
|
stub_const('Points::RawData::Clearer::BATCH_SIZE', 2)
|
||||||
|
|
||||||
|
result = clearer.call
|
||||||
|
|
||||||
|
expect(result[:cleared]).to eq(5)
|
||||||
|
expect(Point.where(raw_data: {}).count).to eq(5)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
166
spec/services/points/raw_data/verifier_spec.rb
Normal file
166
spec/services/points/raw_data/verifier_spec.rb
Normal file
|
|
@ -0,0 +1,166 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe Points::RawData::Verifier do
|
||||||
|
let(:user) { create(:user) }
|
||||||
|
let(:verifier) { described_class.new }
|
||||||
|
|
||||||
|
before do
|
||||||
|
allow(PointsChannel).to receive(:broadcast_to)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#verify_specific_archive' do
|
||||||
|
let(:test_date) { 3.months.ago.beginning_of_month.utc }
|
||||||
|
let!(:points) do
|
||||||
|
create_list(:point, 5, user: user,
|
||||||
|
timestamp: test_date.to_i,
|
||||||
|
raw_data: { lon: 13.4, lat: 52.5 })
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:archive) do
|
||||||
|
# Create archive
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
Points::RawDataArchive.last
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'verifies a valid archive successfully' do
|
||||||
|
expect(archive.verified_at).to be_nil
|
||||||
|
|
||||||
|
verifier.verify_specific_archive(archive.id)
|
||||||
|
archive.reload
|
||||||
|
|
||||||
|
expect(archive.verified_at).to be_present
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'detects missing file' do
|
||||||
|
archive.file.purge
|
||||||
|
archive.reload
|
||||||
|
|
||||||
|
expect do
|
||||||
|
verifier.verify_specific_archive(archive.id)
|
||||||
|
end.not_to change { archive.reload.verified_at }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'detects point count mismatch' do
|
||||||
|
# Tamper with point count
|
||||||
|
archive.update_column(:point_count, 999)
|
||||||
|
|
||||||
|
expect do
|
||||||
|
verifier.verify_specific_archive(archive.id)
|
||||||
|
end.not_to change { archive.reload.verified_at }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'detects checksum mismatch' do
|
||||||
|
# Tamper with checksum
|
||||||
|
archive.update_column(:point_ids_checksum, 'invalid')
|
||||||
|
|
||||||
|
expect do
|
||||||
|
verifier.verify_specific_archive(archive.id)
|
||||||
|
end.not_to change { archive.reload.verified_at }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'detects deleted points' do
|
||||||
|
# Force archive creation first
|
||||||
|
archive_id = archive.id
|
||||||
|
|
||||||
|
# Then delete one point from database
|
||||||
|
points.first.destroy
|
||||||
|
|
||||||
|
expect do
|
||||||
|
verifier.verify_specific_archive(archive_id)
|
||||||
|
end.not_to change { archive.reload.verified_at }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'detects raw_data mismatch between archive and database' do
|
||||||
|
# Force archive creation first
|
||||||
|
archive_id = archive.id
|
||||||
|
|
||||||
|
# Then modify raw_data in database after archiving
|
||||||
|
points.first.update_column(:raw_data, { lon: 999.0, lat: 999.0 })
|
||||||
|
|
||||||
|
expect do
|
||||||
|
verifier.verify_specific_archive(archive_id)
|
||||||
|
end.not_to change { archive.reload.verified_at }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'verifies raw_data matches between archive and database' do
|
||||||
|
# Ensure data hasn't changed
|
||||||
|
expect(points.first.raw_data).to eq({ 'lon' => 13.4, 'lat' => 52.5 })
|
||||||
|
|
||||||
|
verifier.verify_specific_archive(archive.id)
|
||||||
|
|
||||||
|
expect(archive.reload.verified_at).to be_present
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#verify_month' do
|
||||||
|
let(:test_date) { 3.months.ago.beginning_of_month.utc }
|
||||||
|
|
||||||
|
before do
|
||||||
|
# Create points
|
||||||
|
create_list(:point, 5, user: user,
|
||||||
|
timestamp: test_date.to_i,
|
||||||
|
raw_data: { lon: 13.4, lat: 52.5 })
|
||||||
|
|
||||||
|
# Archive them
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'verifies all archives for a month' do
|
||||||
|
expect(Points::RawDataArchive.where(verified_at: nil).count).to eq(1)
|
||||||
|
|
||||||
|
verifier.verify_month(user.id, test_date.year, test_date.month)
|
||||||
|
|
||||||
|
expect(Points::RawDataArchive.where(verified_at: nil).count).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#call' do
|
||||||
|
let(:test_date) { 3.months.ago.beginning_of_month.utc }
|
||||||
|
|
||||||
|
before do
|
||||||
|
# Create points and archive
|
||||||
|
create_list(:point, 5, user: user,
|
||||||
|
timestamp: test_date.to_i,
|
||||||
|
raw_data: { lon: 13.4, lat: 52.5 })
|
||||||
|
|
||||||
|
archiver = Points::RawData::Archiver.new
|
||||||
|
archiver.archive_specific_month(user.id, test_date.year, test_date.month)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'verifies all unverified archives' do
|
||||||
|
expect(Points::RawDataArchive.where(verified_at: nil).count).to eq(1)
|
||||||
|
|
||||||
|
result = verifier.call
|
||||||
|
|
||||||
|
expect(result[:verified]).to eq(1)
|
||||||
|
expect(result[:failed]).to eq(0)
|
||||||
|
expect(Points::RawDataArchive.where(verified_at: nil).count).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'reports failures' do
|
||||||
|
# Tamper with archive
|
||||||
|
Points::RawDataArchive.last.update_column(:point_count, 999)
|
||||||
|
|
||||||
|
result = verifier.call
|
||||||
|
|
||||||
|
expect(result[:verified]).to eq(0)
|
||||||
|
expect(result[:failed]).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'skips already verified archives' do
|
||||||
|
# Verify once
|
||||||
|
verifier.call
|
||||||
|
|
||||||
|
# Try to verify again with a new verifier instance
|
||||||
|
new_verifier = Points::RawData::Verifier.new
|
||||||
|
result = new_verifier.call
|
||||||
|
|
||||||
|
expect(result[:verified]).to eq(0)
|
||||||
|
expect(result[:failed]).to eq(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Reference in a new issue