From d71a1891a255dc3fa35e16b3c69688f6ba738b4f Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Mon, 16 Sep 2024 09:43:02 +0200 Subject: [PATCH 1/7] Add `ActiveJob` adapter for `delayed_job` Implements adapter for `ActiveJob` inside the gem. This adapter is defined only if rails version is >=8, for earlier rails versions adapter is definied inside the Rails itself --- Gemfile | 6 +- .../queue_adapters/delayed_job_adapter.rb | 60 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 lib/active_job/queue_adapters/delayed_job_adapter.rb diff --git a/Gemfile b/Gemfile index ec269986..b593b4bc 100644 --- a/Gemfile +++ b/Gemfile @@ -50,10 +50,14 @@ group :test do if ENV['RAILS_VERSION'] == 'edge' gem 'actionmailer', :github => 'rails/rails' gem 'activerecord', :github => 'rails/rails' + gem 'activejob', :github => 'rails/rails' elsif ENV['RAILS_VERSION'] gem 'actionmailer', "~> #{ENV['RAILS_VERSION']}" gem 'activerecord', "~> #{ENV['RAILS_VERSION']}" - if ENV['RAILS_VERSION'] < '5.1' + + if ENV['RAILS_VERSION'] >= '8.0' + gem 'activejob', "~> #{ENV['RAILS_VERSION']}" + elsif ENV['RAILS_VERSION'] < '5.1' gem 'loofah', '2.3.1' gem 'nokogiri', '< 1.11.0' gem 'rails-html-sanitizer', '< 1.4.0' diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb new file mode 100644 index 00000000..cb0d1cf6 --- /dev/null +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -0,0 +1,60 @@ +if defined?(ActiveJob) && ActiveJob.version >= "8.0.0.alpha" + module ActiveJob + module QueueAdapters + # = Delayed Job adapter for Active Job + # + # To use Delayed Job, set the queue_adapter config to +:delayed_job+. + # + # Rails.application.config.active_job.queue_adapter = :delayed_job + class DelayedJobAdapter + def initialize(enqueue_after_transaction_commit: false) + @enqueue_after_transaction_commit = enqueue_after_transaction_commit + end + + def enqueue_after_transaction_commit? # :nodoc: + @enqueue_after_transaction_commit + end + + def enqueue(job) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) + job.provider_job_id = delayed_job.id + delayed_job + end + + def enqueue_at(job, timestamp) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) + job.provider_job_id = delayed_job.id + delayed_job + end + + class JobWrapper + attr_accessor :job_data + + def initialize(job_data) + @job_data = job_data + end + + def display_name + base_name = "#{job_data["job_class"]} [#{job_data["job_id"]}] from DelayedJob(#{job_data["queue_name"]})" + + return base_name unless log_arguments? + + "#{base_name} with arguments: #{job_data["arguments"]}" + end + + def perform + Base.execute(job_data) + end + + private + def log_arguments? + job_data["job_class"].constantize.log_arguments? + rescue NameError + false + end + end + end + end + end +end + From 0c582baa6f9ebc43258db37e5c936cda68f6c9bb Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Sun, 13 Oct 2024 11:52:30 +0200 Subject: [PATCH 2/7] Use local adapter --- .../queue_adapters/delayed_job_adapter.rb | 91 ++++++++++--------- lib/delayed/railtie.rb | 1 + 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index cb0d1cf6..6ff6d2ef 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -1,58 +1,59 @@ -if defined?(ActiveJob) && ActiveJob.version >= "8.0.0.alpha" - module ActiveJob - module QueueAdapters - # = Delayed Job adapter for Active Job - # - # To use Delayed Job, set the queue_adapter config to +:delayed_job+. - # - # Rails.application.config.active_job.queue_adapter = :delayed_job - class DelayedJobAdapter - def initialize(enqueue_after_transaction_commit: false) - @enqueue_after_transaction_commit = enqueue_after_transaction_commit - end - - def enqueue_after_transaction_commit? # :nodoc: - @enqueue_after_transaction_commit - end +module ActiveJob + module QueueAdapters + # Explicitly remove the implementation existing in older rails'. + remove_const(:DelayedJobAdapter) if defined?(:DelayedJobAdapter) + + # = Delayed Job adapter for Active Job + # + # To use Delayed Job, set the queue_adapter config to +:delayed_job+. + # + # Rails.application.config.active_job.queue_adapter = :delayed_job + class DelayedJobAdapter < ::ActiveJob::QueueAdapters::AbstractAdapter + def initialize(enqueue_after_transaction_commit: false) + @enqueue_after_transaction_commit = enqueue_after_transaction_commit + end - def enqueue(job) - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) - job.provider_job_id = delayed_job.id - delayed_job - end + def enqueue_after_transaction_commit? # :nodoc: + @enqueue_after_transaction_commit + end - def enqueue_at(job, timestamp) - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) - job.provider_job_id = delayed_job.id - delayed_job - end + def enqueue(job) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) + job.provider_job_id = delayed_job.id + delayed_job + end - class JobWrapper - attr_accessor :job_data + def enqueue_at(job, timestamp) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) + job.provider_job_id = delayed_job.id + delayed_job + end - def initialize(job_data) - @job_data = job_data - end + class JobWrapper + attr_accessor :job_data - def display_name - base_name = "#{job_data["job_class"]} [#{job_data["job_id"]}] from DelayedJob(#{job_data["queue_name"]})" + def initialize(job_data) + @job_data = job_data + end - return base_name unless log_arguments? + def display_name + base_name = "#{job_data["job_class"]} [#{job_data["job_id"]}] from DelayedJob(#{job_data["queue_name"]})" - "#{base_name} with arguments: #{job_data["arguments"]}" - end + return base_name unless log_arguments? - def perform - Base.execute(job_data) - end + "#{base_name} with arguments: #{job_data["arguments"]}" + end - private - def log_arguments? - job_data["job_class"].constantize.log_arguments? - rescue NameError - false - end + def perform + Base.execute(job_data) end + + private + def log_arguments? + job_data["job_class"].constantize.log_arguments? + rescue NameError + false + end end end end diff --git a/lib/delayed/railtie.rb b/lib/delayed/railtie.rb index a50ca1b4..c25f7dde 100644 --- a/lib/delayed/railtie.rb +++ b/lib/delayed/railtie.rb @@ -1,5 +1,6 @@ require 'delayed_job' require 'rails' +require 'active_job/queue_adapters/delayed_job_adapter' module Delayed class Railtie < Rails::Railtie From 4fa9befb58ecc94c122ee93e2e0b4f62a843f562 Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Sun, 13 Oct 2024 12:56:01 +0200 Subject: [PATCH 3/7] Start writing tests --- Gemfile | 4 +- .../queue_adapters/delayed_job_adapter.rb | 2 + spec/active_job_adapter_spec.rb | 58 +++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 spec/active_job_adapter_spec.rb diff --git a/Gemfile b/Gemfile index b593b4bc..536a22ed 100644 --- a/Gemfile +++ b/Gemfile @@ -55,9 +55,7 @@ group :test do gem 'actionmailer', "~> #{ENV['RAILS_VERSION']}" gem 'activerecord', "~> #{ENV['RAILS_VERSION']}" - if ENV['RAILS_VERSION'] >= '8.0' - gem 'activejob', "~> #{ENV['RAILS_VERSION']}" - elsif ENV['RAILS_VERSION'] < '5.1' + if ENV['RAILS_VERSION'] < '5.1' gem 'loofah', '2.3.1' gem 'nokogiri', '< 1.11.0' gem 'rails-html-sanitizer', '< 1.4.0' diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 6ff6d2ef..56626dab 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -1,3 +1,5 @@ +require "byebug" + module ActiveJob module QueueAdapters # Explicitly remove the implementation existing in older rails'. diff --git a/spec/active_job_adapter_spec.rb b/spec/active_job_adapter_spec.rb new file mode 100644 index 00000000..f4002ec9 --- /dev/null +++ b/spec/active_job_adapter_spec.rb @@ -0,0 +1,58 @@ +require 'helper' +require 'active_job' +require "byebug" + +describe 'a Rails active job backend' do + module JobBuffer + class << self + def clear + values.clear + end + + def add(value) + values << value + end + + def values + @values ||= [] + end + end + end + + class TestJob < ActiveJob::Base + queue_as :integration_tests + + def perform(message) + JobBuffer.add(message) + end + end + + let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w(integration_tests)) } + + it 'enqueus and executes the job' do + thread = Thread.new { worker.start } + + ActiveJob::Base.queue_adapter = :delayed_job + job = TestJob.perform_later('hello') + sleep 2 + + expect(JobBuffer.values).to eq(['hello']) + ensure + worker.stop + thread.join + end + + it 'runs multiple queued jobs' do + JobBuffer.clear + thread = Thread.new { worker.start } + + ActiveJob::Base.queue_adapter = :delayed_job + ActiveJob.perform_all_later(TestJob.new('Rails'), TestJob.new('World')) + sleep 2 + + expect(JobBuffer.values).to eq(['Rails', 'World']) + ensure + worker.stop + thread.join + end +end From b86b3eeb3767cce59922550c6153fb98d319f300 Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Sun, 13 Oct 2024 13:12:37 +0200 Subject: [PATCH 4/7] More testing --- spec/active_job_adapter_spec.rb | 44 +++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/spec/active_job_adapter_spec.rb b/spec/active_job_adapter_spec.rb index f4002ec9..a5585823 100644 --- a/spec/active_job_adapter_spec.rb +++ b/spec/active_job_adapter_spec.rb @@ -29,30 +29,36 @@ def perform(message) let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w(integration_tests)) } - it 'enqueus and executes the job' do - thread = Thread.new { worker.start } - + before do ActiveJob::Base.queue_adapter = :delayed_job - job = TestJob.perform_later('hello') - sleep 2 - - expect(JobBuffer.values).to eq(['hello']) - ensure - worker.stop - thread.join end - it 'runs multiple queued jobs' do + after do JobBuffer.clear - thread = Thread.new { worker.start } + end - ActiveJob::Base.queue_adapter = :delayed_job - ActiveJob.perform_all_later(TestJob.new('Rails'), TestJob.new('World')) - sleep 2 + it 'enqueus and executes the job' do + start_worker do + job = TestJob.perform_later('hello') + sleep 2 + expect(JobBuffer.values).to eq(['hello']) + end + end - expect(JobBuffer.values).to eq(['Rails', 'World']) - ensure - worker.stop - thread.join + it 'runs multiple queued jobs' do + start_worker do + ActiveJob.perform_all_later(TestJob.new('Rails'), TestJob.new('World')) + sleep 2 + expect(JobBuffer.values).to eq(['Rails', 'World']) + end end + + private + def start_worker(&) + thread = Thread.new { worker.start } + yield + ensure + worker.stop + thread.join + end end From 631868d4165b57cc476144f7c02edb4e8febf5a3 Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Sun, 13 Oct 2024 13:16:46 +0200 Subject: [PATCH 5/7] Test scheduled jobs --- spec/active_job_adapter_spec.rb | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/spec/active_job_adapter_spec.rb b/spec/active_job_adapter_spec.rb index a5585823..68cc4f24 100644 --- a/spec/active_job_adapter_spec.rb +++ b/spec/active_job_adapter_spec.rb @@ -39,7 +39,7 @@ def perform(message) it 'enqueus and executes the job' do start_worker do - job = TestJob.perform_later('hello') + TestJob.perform_later('hello') sleep 2 expect(JobBuffer.values).to eq(['hello']) end @@ -53,6 +53,22 @@ def perform(message) end end + it 'should not run job enqueued in the future' do + start_worker do + TestJob.set(wait: 5.seconds).perform_later('hello') + sleep 2 + expect(JobBuffer.values.empty?).to eq true + end + end + + it 'should run job enqueued in the future at the specified time' do + start_worker do + TestJob.set(wait: 5.seconds).perform_later('hello') + sleep 10 + expect(JobBuffer.values).to eq(['hello']) + end + end + private def start_worker(&) thread = Thread.new { worker.start } From 41f2b77ef505a87cfd1484c4b7ceceb237d471a8 Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Sun, 13 Oct 2024 18:38:40 +0200 Subject: [PATCH 6/7] More testing cases --- Gemfile | 1 + .../queue_adapters/delayed_job_adapter.rb | 2 - spec/active_job_adapter_spec.rb | 65 +++++++++++++++---- 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/Gemfile b/Gemfile index 536a22ed..d88ba639 100644 --- a/Gemfile +++ b/Gemfile @@ -83,6 +83,7 @@ group :test do if ENV['RAILS_VERSION'].nil? || ENV['RAILS_VERSION'] >= '6.0.0' gem 'zeitwerk', :require => false end + gem 'concurrent-ruby' end group :rubocop do diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 56626dab..6ff6d2ef 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -1,5 +1,3 @@ -require "byebug" - module ActiveJob module QueueAdapters # Explicitly remove the implementation existing in older rails'. diff --git a/spec/active_job_adapter_spec.rb b/spec/active_job_adapter_spec.rb index 68cc4f24..fd4bb94e 100644 --- a/spec/active_job_adapter_spec.rb +++ b/spec/active_job_adapter_spec.rb @@ -1,20 +1,22 @@ require 'helper' require 'active_job' -require "byebug" +require 'concurrent' describe 'a Rails active job backend' do module JobBuffer + @values = Concurrent::Array.new + class << self def clear - values.clear + @values.clear end def add(value) - values << value + @values << value end def values - @values ||= [] + @values.dup end end end @@ -27,21 +29,41 @@ def perform(message) end end - let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w(integration_tests)) } + let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w[integration_tests]) } before do + JobBuffer.clear + Delayed::Job.delete_all ActiveJob::Base.queue_adapter = :delayed_job + ActiveJob::Base.logger = nil end - after do - JobBuffer.clear + it "should supply a wrapped class name to DelayedJob" do + TestJob.perform_later + job = Delayed::Job.all.last + expect(job.name).to match(/TestJob \[[0-9a-f-]+\] from DelayedJob\(integration_tests\) with arguments: \[\]/) end it 'enqueus and executes the job' do start_worker do - TestJob.perform_later('hello') + TestJob.perform_later('Rails') sleep 2 - expect(JobBuffer.values).to eq(['hello']) + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should not run jobs queued on a non-listening queue" do + start_worker do + old_queue = TestJob.queue_name + + begin + TestJob.queue_as :some_other_queue + TestJob.perform_later "Rails" + sleep 2 + expect(JobBuffer.values.empty?).to eq true + ensure + TestJob.queue_name = old_queue + end end end @@ -55,7 +77,7 @@ def perform(message) it 'should not run job enqueued in the future' do start_worker do - TestJob.set(wait: 5.seconds).perform_later('hello') + TestJob.set(wait: 5.seconds).perform_later('Rails') sleep 2 expect(JobBuffer.values.empty?).to eq true end @@ -63,9 +85,28 @@ def perform(message) it 'should run job enqueued in the future at the specified time' do start_worker do - TestJob.set(wait: 5.seconds).perform_later('hello') + TestJob.set(wait: 5.seconds).perform_later('Rails') + sleep 10 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should run job bulk enqueued in the future at the specified time" do + start_worker do + ActiveJob.perform_all_later([TestJob.new("Rails").set(wait: 5.seconds)]) + sleep 10 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should run job with higher priority first" do + start_worker do + wait_until = Time.now + 3.seconds + TestJob.set(wait_until: wait_until, priority: 20).perform_later "1" + TestJob.set(wait_until: wait_until, priority: 10).perform_later "2" sleep 10 - expect(JobBuffer.values).to eq(['hello']) + + expect(JobBuffer.values).to eq(['2', '1']) end end From 1f3b958b270e45ff07ec528e2d13d8e82938dcb8 Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Sat, 16 Nov 2024 12:03:18 +0100 Subject: [PATCH 7/7] Fix loading adapters with `const_defined?` --- lib/active_job/queue_adapters/delayed_job_adapter.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 6ff6d2ef..513a0154 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -1,7 +1,7 @@ module ActiveJob module QueueAdapters # Explicitly remove the implementation existing in older rails'. - remove_const(:DelayedJobAdapter) if defined?(:DelayedJobAdapter) + remove_const(:DelayedJobAdapter) if const_defined?(:DelayedJobAdapter) # = Delayed Job adapter for Active Job #