diff --git a/.rubocop.yml b/.rubocop.yml index f6ab1c1c..e09d4b7b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -17,6 +17,7 @@ Metrics/BlockLength: - 'spec/**/*.rb' Metrics/MethodLength: + Max: 15 Exclude: - 'spec/**/*.rb' @@ -24,6 +25,9 @@ Metrics/ModuleLength: Exclude: - 'spec/**/*.rb' +Metrics/ClassLength: + Max: 150 + Naming/FileName: Exclude: - 'lib/aws-sdk-rails.rb' diff --git a/CHANGELOG.md b/CHANGELOG.md index e6af52e8..e4452864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Feature - Support async job processing in Elastic Beanstalk middleware. (#167) + 5.0.0 (2024-11-21) ------------------ diff --git a/README.md b/README.md index 0b44af16..916a920d 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ the `AWS_PROCESS_BEANSTALK_WORKER_REQUESTS` environment variable to `true` in the worker environment configuration. The [SQS Daemon](https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-daemon) running on the worker sends messages as a POST request to `http://localhost/`. -The aws-sdk-rails middleware will forward each request and parameters to their +The ElasticBeanstalkSQSD middleware will forward each request and parameters to their appropriate jobs. The middleware will only process requests from the SQS daemon and will pass on others and so will not interfere with other routes in your application. @@ -134,7 +134,34 @@ application. To protect against forgeries, daemon requests will only be processed if they originate from localhost or the Docker host. -Periodic (scheduled) jobs are also supported with this approach. Elastic +#### Running Jobs Async +By default the ElasticBeanstalkSQSD middleware will process jobs synchronously +and will not complete the request until the job has finished executing. For +long running jobs (exceeding the configured nginix timeout on the worker) this +may cause timeouts and incomplete executions. + +To run jobs asynchronously, set the `AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC` +environment variable to `true` in your worker environment. Jobs will be queued +in a ThreadPoolExecutor and the request will return a 200 OK immediately and the +SQS message will be deleted and the job will be executed in the background. + +By default the executor will use the available processor count as the the +max_threads. You can configure the max threads for the executor by setting +the `AWS_PROCESS_BEANSTALK_WORKER_THREADS` environment variable. + +When there is no additional capacity to execute a task, the middleware +returns a 429 (too many requests) response which will result in the +sqsd NOT deleting the message. The message will be retried again once its +visibility timeout is reached. + +Periodic (scheduled) tasks will also be run asynchronously in the same way. +Elastic beanstalk queues a message for the periodic task and if there is +no capacity to execute the task, it will be retried again once the message's +visibility timeout is reached. + +#### Periodic (scheduled) jobs +[Periodic (scheduled) tasks](https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-periodictasks) +are also supported with this approach. Elastic Beanstalk workers support the addition of a `cron.yaml` file in the application root to configure this. You can call your jobs from your controller actions or if you name your cron job the same as your job class and set the URL to diff --git a/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb b/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb index 5f75493e..991f8cc7 100644 --- a/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb +++ b/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb @@ -8,6 +8,10 @@ class ElasticBeanstalkSQSD def initialize(app) @app = app @logger = ::Rails.logger + + return unless ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC'] + + @executor = init_executor end def call(env) @@ -25,48 +29,108 @@ def call(env) end # Execute job or periodic task based on HTTP request context - periodic_task?(request) ? execute_periodic_task(request) : execute_job(request) + execute(request) + end + + def shutdown(timeout = nil) + return unless @executor + + @logger.info("Shutting down SQS EBS background job executor. Timeout: #{timeout}") + @executor.shutdown + clean_shutdown = @executor.wait_for_termination(timeout) + @logger.info("SQS EBS background executor shutdown complete. Clean: #{clean_shutdown}") end private + def init_executor + threads = Integer(ENV.fetch('AWS_PROCESS_BEANSTALK_WORKER_THREADS', + Concurrent.available_processor_count || Concurrent.processor_count)) + options = { + max_threads: threads, + max_queue: 1, + auto_terminate: false, # register our own at_exit to gracefully shutdown + fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled + } + at_exit { shutdown } + + Concurrent::ThreadPoolExecutor.new(options) + end + + def execute(request) + if periodic_task?(request) + execute_periodic_task(request) + else + execute_job(request) + end + end + def execute_job(request) + if @executor + _execute_job_background(request) + else + _execute_job_now(request) + end + end + + # Execute a job in the current thread + def _execute_job_now(request) # Jobs queued from the SQS adapter contain the JSON message in the request body. job = ::ActiveSupport::JSON.decode(request.body.string) job_name = job['job_class'] @logger.debug("Executing job: #{job_name}") - _execute_job(job, job_name) - [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]] - rescue NameError - internal_error_response - end - - def _execute_job(job, job_name) ::ActiveJob::Base.execute(job) + [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]] rescue NameError => e @logger.error("Job #{job_name} could not resolve to a class that inherits from Active Job.") @logger.error("Error: #{e}") - raise e + internal_error_response + end + + # Execute a job using the thread pool executor + def _execute_job_background(request) + job_data = ::ActiveSupport::JSON.decode(request.body.string) + @logger.debug("Queuing background job: #{job_data['job_class']}") + @executor.post(job_data) do |job| + ::ActiveJob::Base.execute(job) + end + [200, { 'Content-Type' => 'text/plain' }, ["Successfully queued job #{job_data['job_class']}"]] + rescue Concurrent::RejectedExecutionError + msg = 'No capacity to execute job.' + @logger.info(msg) + [429, { 'Content-Type' => 'text/plain' }, [msg]] end def execute_periodic_task(request) # The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml. job_name = request.headers['X-Aws-Sqsd-Taskname'] - @logger.debug("Creating and executing periodic task: #{job_name}") - _execute_periodic_task(job_name) - [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job_name}."]] - rescue NameError - internal_error_response - end - - def _execute_periodic_task(job_name) job = job_name.constantize.new - job.perform_now + if @executor + _execute_periodic_task_background(job) + else + _execute_periodic_task_now(job) + end rescue NameError => e @logger.error("Periodic task #{job_name} could not resolve to an Active Job class " \ '- check the cron name spelling and set the path as / in cron.yaml.') @logger.error("Error: #{e}.") - raise e + internal_error_response + end + + def _execute_periodic_task_now(job) + @logger.debug("Executing periodic task: #{job.class}") + job.perform_now + [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job.class}."]] + end + + def _execute_periodic_task_background(job) + @logger.debug("Queuing bakground periodic task: #{job.class}") + @executor.post(job, &:perform_now) + [200, { 'Content-Type' => 'text/plain' }, ["Successfully queued periodic task #{job.class}"]] + rescue Concurrent::RejectedExecutionError + msg = 'No capacity to execute periodic task.' + @logger.info(msg) + [429, { 'Content-Type' => 'text/plain' }, [msg]] end def internal_error_response diff --git a/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb b/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb index e7861372..f41dfdde 100644 --- a/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb +++ b/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb @@ -219,6 +219,50 @@ module Middleware include_examples 'is valid in either cgroup1 or cgroup2' end + context 'when AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC' do + before(:each) do + ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC'] = 'true' + end + + after(:each) do + ENV.delete('AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC') + end + + it 'queues job' do + expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post) + expect(response[0]).to eq(200) + expect(response[2]).to eq(['Successfully queued job ElasticBeanstalkJob']) + end + + context 'no capacity' do + it 'returns too many requests error' do + allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post) + .and_raise Concurrent::RejectedExecutionError + + expect(response[0]).to eq(429) + end + end + + context 'periodic task' do + let(:is_periodic_task) { true } + + it 'queues job' do + expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post) + expect(response[0]).to eq(200) + expect(response[2]).to eq(['Successfully queued periodic task ElasticBeanstalkPeriodicTask']) + end + + context 'no capacity' do + it 'returns too many requests error' do + allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post) + .and_raise Concurrent::RejectedExecutionError + + expect(response[0]).to eq(429) + end + end + end + end + def stub_runs_in_neither_docker_container proc_1_cgroup = <<~CONTENT 0::/ diff --git a/tasks/release b/tasks/release index 109aae07..15cde91f 160000 --- a/tasks/release +++ b/tasks/release @@ -1 +1 @@ -Subproject commit 109aae0714a9f965031a79af9e59247d25da7c92 +Subproject commit 15cde91f275e893955756336e6fda0bc90d413e7