diff --git a/Gemfile b/Gemfile index c8fa08c2..72712274 100644 --- a/Gemfile +++ b/Gemfile @@ -20,6 +20,7 @@ gem 'httparty' gem 'importmap-rails' gem 'kaminari' gem 'lograge' +gem 'mission_control-jobs' gem 'oj' gem 'pg' gem 'prometheus_exporter' @@ -42,6 +43,8 @@ gem 'sidekiq-limit_fetch' gem 'sprockets-rails' gem 'stimulus-rails' gem 'strong_migrations' +gem 'solid_cable', '~> 3.0' +gem 'solid_queue', '~> 1.1' gem 'tailwindcss-rails' gem 'turbo-rails' gem 'tzinfo-data', platforms: %i[mingw mswin x64_mingw jruby] diff --git a/Gemfile.lock b/Gemfile.lock index 523f8e9f..bc29f3ba 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -218,6 +218,16 @@ GEM mini_mime (1.1.5) mini_portile2 (2.8.8) minitest (5.25.5) + mission_control-jobs (1.0.2) + actioncable (>= 7.1) + actionpack (>= 7.1) + activejob (>= 7.1) + activerecord (>= 7.1) + importmap-rails (>= 1.2.1) + irb (~> 1.13) + railties (>= 7.1) + stimulus-rails + turbo-rails msgpack (1.7.3) multi_json (1.15.0) multi_xml (0.7.1) @@ -420,6 +430,18 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.13.1) simplecov_json_formatter (0.1.4) + solid_cable (3.0.8) + actioncable (>= 7.2) + activejob (>= 7.2) + activerecord (>= 7.2) + railties (>= 7.2) + solid_queue (1.1.5) + activejob (>= 7.1) + activerecord (>= 7.1) + concurrent-ruby (>= 1.3.1) + fugit (~> 1.11.0) + railties (>= 7.1) + thor (~> 1.3.1) sprockets (4.2.1) concurrent-ruby (~> 1.0) rack (>= 2.2.4, < 4) @@ -505,6 +527,7 @@ DEPENDENCIES jwt kaminari lograge + mission_control-jobs oj pg prometheus_exporter @@ -530,6 +553,8 @@ DEPENDENCIES sidekiq-cron sidekiq-limit_fetch simplecov + solid_cable (~> 3.0) + solid_queue (~> 1.1) sprockets-rails stackprof stimulus-rails diff --git a/app/jobs/application_job.rb b/app/jobs/application_job.rb index d394c3d1..ddeab5d8 100644 --- a/app/jobs/application_job.rb +++ b/app/jobs/application_job.rb @@ -1,7 +1,11 @@ +# frozen_string_literal: true + class ApplicationJob < ActiveJob::Base # Automatically retry jobs that encountered a deadlock # retry_on ActiveRecord::Deadlocked + retry_on Exception, wait: :polynomially_longer, attempts: 25 + # Most jobs are safe to ignore if the underlying records are no longer available # discard_on ActiveJob::DeserializationError end diff --git a/app/jobs/jobs/clean_finished_job.rb b/app/jobs/jobs/clean_finished_job.rb new file mode 100644 index 00000000..c5fc2037 --- /dev/null +++ b/app/jobs/jobs/clean_finished_job.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class Jobs::CleanFinishedJob < ApplicationJob + queue_as :default + + def perform + SolidQueue::Job.clear_finished_in_batches + end +end diff --git a/bin/jobs b/bin/jobs new file mode 100755 index 00000000..dcf59f30 --- /dev/null +++ b/bin/jobs @@ -0,0 +1,6 @@ +#!/usr/bin/env ruby + +require_relative "../config/environment" +require "solid_queue/cli" + +SolidQueue::Cli.start(ARGV) diff --git a/config/cable.yml b/config/cable.yml index c3738c80..fd1a239f 100644 --- a/config/cable.yml +++ b/config/cable.yml @@ -1,11 +1,23 @@ +# Async adapter only works within the same process, so for manually triggering cable updates from a console, +# and seeing results in the browser, you must do so from the web console (running inside the dev process), +# not a terminal started via bin/rails console! Add "console" to any action or any ERB template view +# to make the web console appear. + development: - adapter: redis - url: <%= ENV['REDIS_URL'] %> + adapter: solid_cable + connects_to: + database: + writing: cable + polling_interval: 0.1.seconds + message_retention: 1.day test: adapter: test production: - adapter: redis - url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %> - channel_prefix: dawarich_production + adapter: solid_cable + connects_to: + database: + writing: cable + polling_interval: 0.1.seconds + message_retention: 1.day diff --git a/config/database.yml b/config/database.yml index 374dfa53..dbe7b626 100644 --- a/config/database.yml +++ b/config/database.yml @@ -10,17 +10,53 @@ default: &default timeout: 5000 development: - <<: *default - database: <%= ENV['DATABASE_NAME'] || 'dawarich_development' %> + primary: + <<: *default + database: <%= ENV['DATABASE_NAME'] || 'dawarich_development' %> + queue: + <<: *default + database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_development_queue' %> + migrations_paths: db/queue_migrate + cable: + <<: *default + database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_development_cable' %> + migrations_paths: db/cable_migrate test: - <<: *default - database: <%= ENV['DATABASE_NAME'] || 'dawarich_test' %> + primary: + <<: *default + database: <%= ENV['DATABASE_NAME'] || 'dawarich_test' %> + queue: + <<: *default + database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_test_queue' %> + migrations_paths: db/queue_migrate + cable: + <<: *default + database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_test_cable' %> + migrations_paths: db/cable_migrate production: - <<: *default - database: <%= ENV['DATABASE_NAME'] || 'dawarich_production' %> + primary: + <<: *default + database: <%= ENV['DATABASE_NAME'] || 'dawarich_production' %> + queue: + <<: *default + database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_production_queue' %> + migrations_paths: db/queue_migrate + cable: + <<: *default + database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_production_cable' %> + migrations_paths: db/cable_migrate staging: - <<: *default - database: <%= ENV['DATABASE_NAME'] || 'dawarich_staging' %> + primary: + <<: *default + database: <%= ENV['DATABASE_NAME'] || 'dawarich_staging' %> + queue: + <<: *default + database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_staging_queue' %> + migrations_paths: db/queue_migrate + cable: + <<: *default + database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_staging_cable' %> + migrations_paths: db/cable_migrate diff --git a/config/environments/development.rb b/config/environments/development.rb index 3edfc64e..ff49d090 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -68,6 +68,14 @@ Rails.application.configure do # Highlight code that enqueued background job in logs. config.active_job.verbose_enqueue_logs = true + config.active_job.queue_adapter = :solid_queue + config.solid_queue.silence_polling = true + # :queue is the name of the database connection + config.solid_queue.connects_to = { database: { writing: :queue } } + + config.mission_control.jobs.http_basic_auth_enabled = false + config.solid_queue.logger = ActiveSupport::Logger.new($stdout) + # Suppress logger output for asset requests. config.assets.quiet = true @@ -95,7 +103,7 @@ Rails.application.configure do config.force_ssl = ENV.fetch('APPLICATION_PROTOCOL', 'http').downcase == 'https' # Direct logs to STDOUT - config.logger = Logger.new($stdout) + config.logger = ActiveSupport::Logger.new($stdout) config.lograge.enabled = true config.lograge.formatter = Lograge::Formatters::Json.new diff --git a/config/environments/production.rb b/config/environments/production.rb index a5487d47..8b4e7dd1 100644 --- a/config/environments/production.rb +++ b/config/environments/production.rb @@ -60,7 +60,7 @@ Rails.application.configure do config.force_ssl = ENV.fetch('APPLICATION_PROTOCOL', 'http').downcase == 'https' # Direct logs to STDOUT - config.logger = Logger.new($stdout) + config.logger = ActiveSupport::Logger.new($stdout) config.lograge.enabled = true config.lograge.formatter = Lograge::Formatters::Json.new @@ -77,7 +77,10 @@ Rails.application.configure do config.cache_store = :redis_cache_store, { url: ENV['REDIS_URL'] } # Use a real queuing backend for Active Job (and separate queues per environment). - # config.active_job.queue_adapter = :resque + config.active_job.queue_adapter = :solid_queue + config.solid_queue.connects_to = { database: { writing: :queue } } + config.solid_queue.silence_polling = true + config.solid_queue.logger = ActiveSupport::Logger.new($stdout) # config.active_job.queue_name_prefix = "dawarich_production" config.action_mailer.perform_caching = false diff --git a/config/puma.rb b/config/puma.rb index e0eb3db7..9157f6ba 100644 --- a/config/puma.rb +++ b/config/puma.rb @@ -43,6 +43,9 @@ preload_app! # Allow puma to be restarted by `bin/rails restart` command. plugin :tmp_restart +# If env var is set or we're in development, solid_queue will run in puma +plugin :solid_queue if ENV['SOLID_QUEUE_IN_PUMA'] || Rails.env.development? + # Prometheus exporter if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true' require 'prometheus_exporter/instrumentation' diff --git a/config/queue.yml b/config/queue.yml new file mode 100644 index 00000000..83b066ed --- /dev/null +++ b/config/queue.yml @@ -0,0 +1,27 @@ + +default: &default + dispatchers: + - polling_interval: 1 + batch_size: 500 + workers: + - queues: "*" + threads: 3 + processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %> + polling_interval: 2 + - queues: imports + threads: 5 + processes: 1 + polling_interval: 1 + - queues: exports + threads: 5 + processes: 1 + polling_interval: 2 + +development: + <<: *default + +test: + <<: *default + +production: + <<: *default diff --git a/config/recurring.yml b/config/recurring.yml new file mode 100644 index 00000000..22f57d3f --- /dev/null +++ b/config/recurring.yml @@ -0,0 +1,34 @@ +periodic_cleanup: + class: "Jobs::CleanFinishedJob" + queue: default + schedule: every month + +bulk_stats_calculating_job: + class: "BulkStatsCalculatingJob" + queue: stats + schedule: every hour + +area_visits_calculation_scheduling_job: + class: "AreaVisitsCalculationSchedulingJob" + queue: visit_suggesting + schedule: every day at 0:00 + +visit_suggesting_job: + class: "BulkVisitsSuggestingJob" + queue: visit_suggesting + schedule: every day at 00:05 + +watcher_job: + class: "Import::WatcherJob" + queue: imports + schedule: every hour + +app_version_checking_job: + class: "AppVersionCheckingJob" + queue: default + schedule: every 6 hours + +cache_preheating_job: + class: "Cache::PreheatingJob" + queue: default + schedule: every day at 0:00 diff --git a/config/routes.rb b/config/routes.rb index 01164189..45f77e17 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -6,6 +6,7 @@ Rails.application.routes.draw do mount ActionCable.server => '/cable' mount Rswag::Api::Engine => '/api-docs' mount Rswag::Ui::Engine => '/api-docs' + mount MissionControl::Jobs::Engine, at: '/jobs' # Protec just as sidekiq unless DawarichSettings.self_hosted? Sidekiq::Web.use(Rack::Auth::Basic) do |username, password| diff --git a/db/cable_schema.rb b/db/cable_schema.rb new file mode 100644 index 00000000..90beff61 --- /dev/null +++ b/db/cable_schema.rb @@ -0,0 +1,26 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[8.0].define(version: 1) do + # These are extensions that must be enabled in order to support this database + enable_extension "pg_catalog.plpgsql" + + create_table "solid_cable_messages", force: :cascade do |t| + t.binary "channel", null: false + t.binary "payload", null: false + t.datetime "created_at", null: false + t.bigint "channel_hash", null: false + t.index ["channel"], name: "index_solid_cable_messages_on_channel" + t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash" + t.index ["created_at"], name: "index_solid_cable_messages_on_created_at" + end +end diff --git a/db/queue_schema.rb b/db/queue_schema.rb new file mode 100644 index 00000000..089e9380 --- /dev/null +++ b/db/queue_schema.rb @@ -0,0 +1,144 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[8.0].define(version: 1) do + # These are extensions that must be enabled in order to support this database + enable_extension "pg_catalog.plpgsql" + + create_table "solid_queue_blocked_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade +end diff --git a/spec/jobs/area_visits_calculation_scheduling_job_spec.rb b/spec/jobs/area_visits_calculation_scheduling_job_spec.rb index 93fd053a..0d375e67 100644 --- a/spec/jobs/area_visits_calculation_scheduling_job_spec.rb +++ b/spec/jobs/area_visits_calculation_scheduling_job_spec.rb @@ -8,11 +8,9 @@ RSpec.describe AreaVisitsCalculationSchedulingJob, type: :job do let(:user) { create(:user) } it 'calls the AreaVisitsCalculationService' do - Sidekiq::Testing.inline! do - expect(AreaVisitsCalculatingJob).to receive(:perform_later).with(user.id).and_call_original + expect(AreaVisitsCalculatingJob).to receive(:perform_later).with(user.id).and_call_original - described_class.new.perform - end + described_class.new.perform end end end diff --git a/spec/jobs/data_migrations/migrate_places_lonlat_job_spec.rb b/spec/jobs/data_migrations/migrate_places_lonlat_job_spec.rb index 994ad142..d2771998 100644 --- a/spec/jobs/data_migrations/migrate_places_lonlat_job_spec.rb +++ b/spec/jobs/data_migrations/migrate_places_lonlat_job_spec.rb @@ -52,7 +52,6 @@ RSpec.describe DataMigrations::MigratePlacesLonlatJob, type: :job do described_class.perform_now(user.id) place1.reload - # SRID should be 4326 (WGS84) expect(place1.lonlat.srid).to eq(4326) end end @@ -64,14 +63,6 @@ RSpec.describe DataMigrations::MigratePlacesLonlatJob, type: :job do end.not_to raise_error end end - - context 'when user does not exist' do - it 'raises ActiveRecord::RecordNotFound' do - expect do - described_class.perform_now(-1) - end.to raise_error(ActiveRecord::RecordNotFound) - end - end end describe 'queue' do diff --git a/spec/jobs/visit_suggesting_job_spec.rb b/spec/jobs/visit_suggesting_job_spec.rb index 61401dd9..f6684813 100644 --- a/spec/jobs/visit_suggesting_job_spec.rb +++ b/spec/jobs/visit_suggesting_job_spec.rb @@ -63,14 +63,6 @@ RSpec.describe VisitSuggestingJob, type: :job do end end - context 'when user not found' do - it 'raises an error' do - expect do - described_class.perform_now(user_id: -1, start_at: start_at, end_at: end_at) - end.to raise_error(ActiveRecord::RecordNotFound) - end - end - context 'with string dates' do let(:string_start) { start_at.to_s } let(:string_end) { end_at.to_s } diff --git a/spec/services/imports/create_spec.rb b/spec/services/imports/create_spec.rb index 69634149..176043b6 100644 --- a/spec/services/imports/create_spec.rb +++ b/spec/services/imports/create_spec.rb @@ -55,16 +55,12 @@ RSpec.describe Imports::Create do context 'when import is successful' do it 'schedules stats creating' do - Sidekiq::Testing.inline! do - expect { service.call }.to \ - have_enqueued_job(Stats::CalculatingJob).with(user.id, 2024, 3) - end + expect { service.call }.to \ + have_enqueued_job(Stats::CalculatingJob).with(user.id, 2024, 3) end it 'schedules visit suggesting' do - Sidekiq::Testing.inline! do - expect { service.call }.to have_enqueued_job(VisitSuggestingJob) - end + expect { service.call }.to have_enqueued_job(VisitSuggestingJob) end end diff --git a/spec/services/imports/watcher_spec.rb b/spec/services/imports/watcher_spec.rb index ac3041c8..94c04053 100644 --- a/spec/services/imports/watcher_spec.rb +++ b/spec/services/imports/watcher_spec.rb @@ -10,7 +10,6 @@ RSpec.describe Imports::Watcher do before do stub_const('Imports::Watcher::WATCHED_DIR_PATH', watched_dir_path) - Sidekiq::Testing.inline! end after { Sidekiq::Testing.fake! }