From 0a0c333599baaebeb307f96c4ce23743f925a560 Mon Sep 17 00:00:00 2001 From: Carlos Antonio da Silva Date: Mon, 29 Apr 2024 17:11:53 -0300 Subject: [PATCH] [WIP] Add base structure for Shoryuken adapter support Metrics collector tests are mostly passing with the base implementation from the gist in place, calculating queue depth. We don't have access to queue latency easily apparently, and tracking busy jobs might be tricky -- if at all possible, but we'll continue investigating the possibility of implementing those. --- judoscale-shoryuken/Gemfile | 10 ++ judoscale-shoryuken/Rakefile | 15 ++ .../judoscale-shoryuken.gemspec | 30 ++++ .../lib/judoscale-shoryuken.rb | 3 + .../lib/judoscale/shoryuken.rb | 15 ++ .../judoscale/shoryuken/metrics_collector.rb | 45 ++++++ .../lib/judoscale/shoryuken/version.rb | 7 + .../lib/rails-autoscale-shoryuken.rb | 3 + .../rails-autoscale-shoryuken.gemspec | 30 ++++ judoscale-shoryuken/test/adapter_test.rb | 36 +++++ .../test/metrics_collector_test.rb | 148 ++++++++++++++++++ judoscale-shoryuken/test/test_helper.rb | 17 ++ 12 files changed, 359 insertions(+) create mode 100644 judoscale-shoryuken/Gemfile create mode 100644 judoscale-shoryuken/Rakefile create mode 100644 judoscale-shoryuken/judoscale-shoryuken.gemspec create mode 100644 judoscale-shoryuken/lib/judoscale-shoryuken.rb create mode 100644 judoscale-shoryuken/lib/judoscale/shoryuken.rb create mode 100644 judoscale-shoryuken/lib/judoscale/shoryuken/metrics_collector.rb create mode 100644 judoscale-shoryuken/lib/judoscale/shoryuken/version.rb create mode 100644 judoscale-shoryuken/lib/rails-autoscale-shoryuken.rb create mode 100644 judoscale-shoryuken/rails-autoscale-shoryuken.gemspec create mode 100644 judoscale-shoryuken/test/adapter_test.rb create mode 100644 judoscale-shoryuken/test/metrics_collector_test.rb create mode 100644 judoscale-shoryuken/test/test_helper.rb diff --git a/judoscale-shoryuken/Gemfile b/judoscale-shoryuken/Gemfile new file mode 100644 index 00000000..3ccd71a9 --- /dev/null +++ b/judoscale-shoryuken/Gemfile @@ -0,0 +1,10 @@ +source "https://rubygems.org" + +gemspec name: "judoscale-shoryuken" + +gem "judoscale-ruby", path: "../judoscale-ruby" +gem "shoryuken", "~> 6.2" +gem "aws-sdk-sqs" # shoryuken integration with AWS SDK v3+ +gem "rexml" # aws-sdk-core requires an XML parser. +gem "minitest" +gem "rake" diff --git a/judoscale-shoryuken/Rakefile b/judoscale-shoryuken/Rakefile new file mode 100644 index 00000000..a171694e --- /dev/null +++ b/judoscale-shoryuken/Rakefile @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "rake/testtask" + +Rake::TestTask.new(:test) do |t| + t.libs = %w[lib test] + t.pattern = "test/**/*_test.rb" +end + +Rake::TestTask.new(:bench) do |t| + t.libs = %w[lib test] + t.pattern = "test/benchmarks/**/*_benchmark.rb" +end + +task default: :test diff --git a/judoscale-shoryuken/judoscale-shoryuken.gemspec b/judoscale-shoryuken/judoscale-shoryuken.gemspec new file mode 100644 index 00000000..06414b6c --- /dev/null +++ b/judoscale-shoryuken/judoscale-shoryuken.gemspec @@ -0,0 +1,30 @@ +lib = File.expand_path("../lib", __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require "judoscale/shoryuken/version" + +Gem::Specification.new do |spec| + spec.name = "judoscale-shoryuken" + spec.version = Judoscale::Shoryuken::VERSION + spec.authors = ["Adam McCrea", "Carlos Antonio da Silva", "Jon Sullivan"] + spec.email = ["hello@judoscale.com"] + + spec.summary = "This gem provides Shoryuken integration with the Judoscale autoscaling add-on for Heroku." + spec.homepage = "https://judoscale.com" + spec.license = "MIT" + + spec.metadata = { + "homepage_uri" => "https://judoscale.com", + "bug_tracker_uri" => "https://github.com/judoscale/judoscale-ruby/issues", + "documentation_uri" => "https://judoscale.com/docs", + "changelog_uri" => "https://github.com/judoscale/judoscale-ruby/blob/main/CHANGELOG.md", + "source_code_uri" => "https://github.com/judoscale/judoscale-ruby" + } + + spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } + spec.require_paths = ["lib"] + + spec.required_ruby_version = ">= 2.6.0" + + spec.add_dependency "judoscale-ruby", Judoscale::Shoryuken::VERSION + spec.add_dependency "shoryuken", ">= 6.0" +end diff --git a/judoscale-shoryuken/lib/judoscale-shoryuken.rb b/judoscale-shoryuken/lib/judoscale-shoryuken.rb new file mode 100644 index 00000000..fc07365f --- /dev/null +++ b/judoscale-shoryuken/lib/judoscale-shoryuken.rb @@ -0,0 +1,3 @@ +# frozen_string_literal: true + +require "judoscale/shoryuken" diff --git a/judoscale-shoryuken/lib/judoscale/shoryuken.rb b/judoscale-shoryuken/lib/judoscale/shoryuken.rb new file mode 100644 index 00000000..1ef0270a --- /dev/null +++ b/judoscale-shoryuken/lib/judoscale/shoryuken.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "judoscale-ruby" +require "judoscale/config" +require "judoscale/shoryuken/version" +require "judoscale/shoryuken/metrics_collector" +require "shoryuken" + +Judoscale.add_adapter :"judoscale-shoryuken", + { + adapter_version: Judoscale::Shoryuken::VERSION, + framework_version: ::Shoryuken::VERSION + }, + metrics_collector: Judoscale::Shoryuken::MetricsCollector, + expose_config: Judoscale::Config::JobAdapterConfig.new(:shoryuken) diff --git a/judoscale-shoryuken/lib/judoscale/shoryuken/metrics_collector.rb b/judoscale-shoryuken/lib/judoscale/shoryuken/metrics_collector.rb new file mode 100644 index 00000000..1b60db67 --- /dev/null +++ b/judoscale-shoryuken/lib/judoscale/shoryuken/metrics_collector.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +require "judoscale/job_metrics_collector" +require "judoscale/metric" + +module Judoscale + module Shoryuken + class MetricsCollector < Judoscale::JobMetricsCollector + def self.adapter_config + Judoscale::Config.instance.shoryuken + end + + def initialize + super + self.queues |= ::Shoryuken.ungrouped_queues + end + + # TODO: support for busy jobs? + def collect + metrics = [] + # TODO: maybe cache the Shoryuken queue instances, to avoid extra requests to SQS? + queues_by_name = Hash.new { |hash, queue_name| + hash[queue_name] = ::Shoryuken::Queue.new(::Shoryuken.sqs_client, queue_name) + } + # Initialize each queue known by Shoryuken. + ::Shoryuken.ungrouped_queues.each do |queue_name| + queues_by_name[queue_name] + end + + self.queues |= queues_by_name.keys + + queues.each do |queue_name| + queue = queues_by_name[queue_name] + # TODO: use public APIs and/or SQS client directly ourselves. + depth = queue.send(:queue_attributes).attributes["ApproximateNumberOfMessages"] + + metrics.push Metric.new(:qd, depth, Time.now, queue_name) + end + + log_collection(metrics) + metrics + end + end + end +end diff --git a/judoscale-shoryuken/lib/judoscale/shoryuken/version.rb b/judoscale-shoryuken/lib/judoscale/shoryuken/version.rb new file mode 100644 index 00000000..384431fa --- /dev/null +++ b/judoscale-shoryuken/lib/judoscale/shoryuken/version.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +module Judoscale + module Shoryuken + VERSION = "1.6.0" + end +end diff --git a/judoscale-shoryuken/lib/rails-autoscale-shoryuken.rb b/judoscale-shoryuken/lib/rails-autoscale-shoryuken.rb new file mode 100644 index 00000000..fc07365f --- /dev/null +++ b/judoscale-shoryuken/lib/rails-autoscale-shoryuken.rb @@ -0,0 +1,3 @@ +# frozen_string_literal: true + +require "judoscale/shoryuken" diff --git a/judoscale-shoryuken/rails-autoscale-shoryuken.gemspec b/judoscale-shoryuken/rails-autoscale-shoryuken.gemspec new file mode 100644 index 00000000..e42d59a2 --- /dev/null +++ b/judoscale-shoryuken/rails-autoscale-shoryuken.gemspec @@ -0,0 +1,30 @@ +lib = File.expand_path("../lib", __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require "judoscale/shoryuken/version" + +Gem::Specification.new do |spec| + spec.name = "rails-autoscale-shoryuken" + spec.version = Judoscale::Shoryuken::VERSION + spec.authors = ["Adam McCrea", "Carlos Antonio da Silva", "Jon Sullivan"] + spec.email = ["hello@judoscale.com"] + + spec.summary = "This gem provides Shoryuken integration with the Judoscale autoscaling add-on for Heroku." + spec.homepage = "https://judoscale.com" + spec.license = "MIT" + + spec.metadata = { + "homepage_uri" => "https://judoscale.com", + "bug_tracker_uri" => "https://github.com/judoscale/judoscale-ruby/issues", + "documentation_uri" => "https://judoscale.com/docs", + "changelog_uri" => "https://github.com/judoscale/judoscale-ruby/blob/main/CHANGELOG.md", + "source_code_uri" => "https://github.com/judoscale/judoscale-ruby" + } + + spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } + spec.require_paths = ["lib"] + + spec.required_ruby_version = ">= 2.6.0" + + spec.add_dependency "rails-autoscale-core", Judoscale::Shoryuken::VERSION + spec.add_dependency "shoryuken", ">= 6.0" +end diff --git a/judoscale-shoryuken/test/adapter_test.rb b/judoscale-shoryuken/test/adapter_test.rb new file mode 100644 index 00000000..55cb84dd --- /dev/null +++ b/judoscale-shoryuken/test/adapter_test.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +require "test_helper" +require "judoscale/report" + +module Judoscale + describe Shoryuken do + it "adds itself as an adapter with information to be reported to the Judoscale API" do + adapter = Judoscale.adapters.detect { |adapter| adapter.identifier == :"judoscale-shoryuken" } + _(adapter).wont_be_nil + _(adapter.metrics_collector).must_equal Judoscale::Shoryuken::MetricsCollector + + report = ::Judoscale::Report.new(Judoscale.adapters, Judoscale::Config.instance, []) + _(report.as_json[:adapters]).must_include(:"judoscale-shoryuken") + end + + it "sets up a config property for the library" do + config = Config.instance + _(config.shoryuken.enabled).must_equal true + _(config.shoryuken.max_queues).must_equal 20 + _(config.shoryuken.queues).must_equal [] + _(config.shoryuken.track_busy_jobs).must_equal false + + Judoscale.configure do |config| + config.shoryuken.queues = %w[test drive] + config.shoryuken.track_busy_jobs = true + end + + _(config.shoryuken.queues).must_equal %w[test drive] + _(config.shoryuken.track_busy_jobs).must_equal true + + report = ::Judoscale::Report.new(Judoscale.adapters, Judoscale::Config.instance, []) + _(report.as_json[:config]).must_include(:shoryuken) + end + end +end diff --git a/judoscale-shoryuken/test/metrics_collector_test.rb b/judoscale-shoryuken/test/metrics_collector_test.rb new file mode 100644 index 00000000..da42ae43 --- /dev/null +++ b/judoscale-shoryuken/test/metrics_collector_test.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +require "test_helper" +require "judoscale/shoryuken/metrics_collector" + +module Judoscale + ShoryukenQueueStub = Struct.new(:name, :latency, :size, keyword_init: true) + + describe Shoryuken::MetricsCollector do + subject { Shoryuken::MetricsCollector.new } + + def stub_sqs(queue_depths) + # Given a queue name, Shoryuken will fetch the queue URL first, then use that to + # request the queue attributes. These stubs translate to those individual calls. + ::Shoryuken.sqs_client.tap { |sqs| + sqs.stub_responses(:get_queue_url, ->(context) { + { queue_url: "https://sqs.us-east-1.amazonaws.com/12345/#{context.params[:queue_name]}" } + }) + sqs.stub_responses(:get_queue_attributes, ->(context) { + depth = queue_depths.fetch(context.params[:queue_url].split("/").last) + { attributes: { "ApproximateNumberOfMessages" => depth.to_s } } + }) + } + end + + describe "#collect" do + after { + subject.clear_queues + } + + it "collects queue depth for each queue (latency is not available yet)" do + stub_sqs "default" => 10, "high" => 5 + + metrics = ::Shoryuken.stub(:ungrouped_queues, %w[default high]) { + subject.collect + } + + _(metrics.size).must_equal 2 + _(metrics[0].value).must_equal 10 + _(metrics[0].queue_name).must_equal "default" + _(metrics[0].identifier).must_equal :qd + _(metrics[1].value).must_equal 5 + _(metrics[1].queue_name).must_equal "high" + _(metrics[1].identifier).must_equal :qd + end + + it "always collects for known queues" do + stub_sqs "default" => 0, "high" => 1 + + metrics = ::Shoryuken.stub(:ungrouped_queues, []) { + subject.collect + } + + _(metrics).must_be :empty? + + metrics = ::Shoryuken.stub(:ungrouped_queues, %w[default]) { + subject.collect + } + + _(metrics.size).must_equal 1 + _(metrics.map(&:queue_name)).must_equal %w[default] + + metrics = ::Shoryuken.stub(:ungrouped_queues, %w[high]) { + subject.collect + } + + _(metrics.size).must_equal 2 + _(metrics.map(&:queue_name)).must_equal %w[default high] + end + + it "logs debug information for each queue being collected" do + use_config log_level: :debug do + stub_sqs "default" => 10 + + ::Shoryuken.stub(:ungrouped_queues, %w[default]) { + subject.collect + } + + _(log_string).must_match %r{shoryuken-qd.default=10} + _(log_string).wont_match %r{shoryuken-qt} + _(log_string).wont_match %r{shoryuken-busy} + end + end + + it "tracks busy jobs when the configuration is enabled" do + skip "busy jobs not available yet." + end + + it "logs debug information about busy jobs being collected" do + skip "busy jobs not available yet." + end + + it "filters queues matching UUID format by default, to prevent reporting for dynamically generated queues" do + queues = %W[low-#{SecureRandom.uuid} default #{SecureRandom.uuid}-high] + stub_sqs queues.map.with_index.to_h + + metrics = ::Shoryuken.stub(:ungrouped_queues, queues) { + subject.collect + } + + _(metrics.size).must_equal 1 + _(metrics.map(&:queue_name)).must_equal %w[default] + end + + it "filters queues to collect metrics from based on the configured queue filter proc, overriding the default UUID filter" do + use_adapter_config :shoryuken, queue_filter: ->(queue_name) { queue_name.start_with? "low" } do + queues = %W[low default high low-#{SecureRandom.uuid}] + stub_sqs queues.map.with_index.to_h + + metrics = ::Shoryuken.stub(:ungrouped_queues, queues) { + subject.collect + } + + _(metrics.size).must_equal 2 + _(metrics[0].queue_name).must_equal "low" + _(metrics[1].queue_name).must_be :start_with?, "low-" + end + end + + it "collects metrics only from the configured queues if the configuration is present, ignoring the queue filter" do + use_adapter_config :shoryuken, queues: %w[low ultra], queue_filter: ->(queue_name) { queue_name != "low" } do + queues = %w[low default high] + stub_sqs "low" => 5, "ultra" => 10 + + metrics = ::Shoryuken.stub(:ungrouped_queues, queues) { + subject.collect + } + + _(metrics.map(&:queue_name)).must_equal %w[low ultra] + end + end + + it "collects metrics up to the configured number of max queues, sorting by length of the queue name" do + use_adapter_config :shoryuken, max_queues: 2 do + queues = %w[low default high] + stub_sqs queues.map.with_index.to_h + + metrics = ::Shoryuken.stub(:ungrouped_queues, queues) { + subject.collect + } + + _(metrics.map(&:queue_name)).must_equal %w[low high] + _(log_string).must_match %r{Shoryuken metrics reporting only 2 queues max, skipping the rest \(1\)} + end + end + end + end +end diff --git a/judoscale-shoryuken/test/test_helper.rb b/judoscale-shoryuken/test/test_helper.rb new file mode 100644 index 00000000..359f4004 --- /dev/null +++ b/judoscale-shoryuken/test/test_helper.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +$LOAD_PATH.unshift File.expand_path("../lib", __dir__) +require "judoscale-shoryuken" + +require "minitest/autorun" +require "minitest/spec" + +module Judoscale::Test +end + +Dir[File.expand_path("../../judoscale-ruby/test/support/*.rb", __dir__)].sort.each { |file| require file } + +Minitest::Test.include(Judoscale::Test) + +# Setup Shoryuken with a stub SQS Client to avoid any API hits, and facilitate testing. +::Shoryuken.sqs_client = Aws::SQS::Client.new(stub_responses: true) \ No newline at end of file