Skip to content

Commit

Permalink
[WIP] Add base structure for Shoryuken adapter support
Browse files Browse the repository at this point in the history
Metrics collector tests are mostly passing with the base implementation
from the gist in place, calculating queue depth.

We don't have access to queue latency easily apparently, and tracking
busy jobs might be tricky -- if at all possible, but we'll continue
investigating the possibility of implementing those.
  • Loading branch information
carlosantoniodasilva committed Apr 30, 2024
1 parent d6ee914 commit 4b71d0d
Show file tree
Hide file tree
Showing 12 changed files with 359 additions and 0 deletions.
10 changes: 10 additions & 0 deletions judoscale-shoryuken/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
source "https://rubygems.org"

gemspec name: "judoscale-shoryuken"

gem "judoscale-ruby", path: "../judoscale-ruby"
gem "shoryuken", "~> 6.2"
gem "aws-sdk-sqs" # shoryuken integration with AWS SDK v3+
gem "rexml" # aws-sdk-core requires an XML parser.
gem "minitest"
gem "rake"
15 changes: 15 additions & 0 deletions judoscale-shoryuken/Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

require "rake/testtask"

Rake::TestTask.new(:test) do |t|
t.libs = %w[lib test]
t.pattern = "test/**/*_test.rb"
end

Rake::TestTask.new(:bench) do |t|
t.libs = %w[lib test]
t.pattern = "test/benchmarks/**/*_benchmark.rb"
end

task default: :test
30 changes: 30 additions & 0 deletions judoscale-shoryuken/judoscale-shoryuken.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require "judoscale/shoryuken/version"

Gem::Specification.new do |spec|
spec.name = "judoscale-shoryuken"
spec.version = Judoscale::Shoryuken::VERSION
spec.authors = ["Adam McCrea", "Carlos Antonio da Silva", "Jon Sullivan"]
spec.email = ["[email protected]"]

spec.summary = "This gem provides Shoryuken integration with the Judoscale autoscaling add-on for Heroku."
spec.homepage = "https://judoscale.com"
spec.license = "MIT"

spec.metadata = {
"homepage_uri" => "https://judoscale.com",
"bug_tracker_uri" => "https://github.com/judoscale/judoscale-ruby/issues",
"documentation_uri" => "https://judoscale.com/docs",
"changelog_uri" => "https://github.com/judoscale/judoscale-ruby/blob/main/CHANGELOG.md",
"source_code_uri" => "https://github.com/judoscale/judoscale-ruby"
}

spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
spec.require_paths = ["lib"]

spec.required_ruby_version = ">= 2.6.0"

spec.add_dependency "judoscale-ruby", Judoscale::Shoryuken::VERSION
spec.add_dependency "shoryuken", ">= 6.0"
end
3 changes: 3 additions & 0 deletions judoscale-shoryuken/lib/judoscale-shoryuken.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# frozen_string_literal: true

require "judoscale/shoryuken"
15 changes: 15 additions & 0 deletions judoscale-shoryuken/lib/judoscale/shoryuken.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

require "judoscale-ruby"
require "judoscale/config"
require "judoscale/shoryuken/version"
require "judoscale/shoryuken/metrics_collector"
require "shoryuken"

Judoscale.add_adapter :"judoscale-shoryuken",
{
adapter_version: Judoscale::Shoryuken::VERSION,
framework_version: ::Shoryuken::VERSION
},
metrics_collector: Judoscale::Shoryuken::MetricsCollector,
expose_config: Judoscale::Config::JobAdapterConfig.new(:shoryuken)
45 changes: 45 additions & 0 deletions judoscale-shoryuken/lib/judoscale/shoryuken/metrics_collector.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# frozen_string_literal: true

require "judoscale/job_metrics_collector"
require "judoscale/metric"

module Judoscale
module Shoryuken
class MetricsCollector < Judoscale::JobMetricsCollector
def self.adapter_config
Judoscale::Config.instance.shoryuken
end

def initialize
super
self.queues |= ::Shoryuken.ungrouped_queues
end

# TODO: support for busy jobs?
def collect
metrics = []
# TODO: maybe cache the Shoryuken queue instances, to avoid extra requests to SQS?
queues_by_name = Hash.new { |hash, queue_name|
hash[queue_name] = ::Shoryuken::Queue.new(::Shoryuken.sqs_client, queue_name)
}
# Initialize each queue known by Shoryuken.
::Shoryuken.ungrouped_queues.each do |queue_name|
queues_by_name[queue_name]
end

self.queues |= queues_by_name.keys

queues.each do |queue_name|
queue = queues_by_name[queue_name]
# TODO: use public APIs and/or SQS client directly ourselves.
depth = queue.send(:queue_attributes).attributes["ApproximateNumberOfMessages"]

metrics.push Metric.new(:qd, depth, Time.now, queue_name)
end

log_collection(metrics)
metrics
end
end
end
end
7 changes: 7 additions & 0 deletions judoscale-shoryuken/lib/judoscale/shoryuken/version.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

module Judoscale
module Shoryuken
VERSION = "1.6.0"
end
end
3 changes: 3 additions & 0 deletions judoscale-shoryuken/lib/rails-autoscale-shoryuken.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# frozen_string_literal: true

require "judoscale/shoryuken"
30 changes: 30 additions & 0 deletions judoscale-shoryuken/rails-autoscale-shoryuken.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require "judoscale/shoryuken/version"

Gem::Specification.new do |spec|
spec.name = "rails-autoscale-shoryuken"
spec.version = Judoscale::Shoryuken::VERSION
spec.authors = ["Adam McCrea", "Carlos Antonio da Silva", "Jon Sullivan"]
spec.email = ["[email protected]"]

spec.summary = "This gem provides Shoryuken integration with the Judoscale autoscaling add-on for Heroku."
spec.homepage = "https://judoscale.com"
spec.license = "MIT"

spec.metadata = {
"homepage_uri" => "https://judoscale.com",
"bug_tracker_uri" => "https://github.com/judoscale/judoscale-ruby/issues",
"documentation_uri" => "https://judoscale.com/docs",
"changelog_uri" => "https://github.com/judoscale/judoscale-ruby/blob/main/CHANGELOG.md",
"source_code_uri" => "https://github.com/judoscale/judoscale-ruby"
}

spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
spec.require_paths = ["lib"]

spec.required_ruby_version = ">= 2.6.0"

spec.add_dependency "rails-autoscale-core", Judoscale::Shoryuken::VERSION
spec.add_dependency "shoryuken", ">= 6.0"
end
36 changes: 36 additions & 0 deletions judoscale-shoryuken/test/adapter_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

require "test_helper"
require "judoscale/report"

module Judoscale
describe Shoryuken do
it "adds itself as an adapter with information to be reported to the Judoscale API" do
adapter = Judoscale.adapters.detect { |adapter| adapter.identifier == :"judoscale-shoryuken" }
_(adapter).wont_be_nil
_(adapter.metrics_collector).must_equal Judoscale::Shoryuken::MetricsCollector

report = ::Judoscale::Report.new(Judoscale.adapters, Judoscale::Config.instance, [])
_(report.as_json[:adapters]).must_include(:"judoscale-shoryuken")
end

it "sets up a config property for the library" do
config = Config.instance
_(config.shoryuken.enabled).must_equal true
_(config.shoryuken.max_queues).must_equal 20
_(config.shoryuken.queues).must_equal []
_(config.shoryuken.track_busy_jobs).must_equal false

Judoscale.configure do |config|
config.shoryuken.queues = %w[test drive]
config.shoryuken.track_busy_jobs = true
end

_(config.shoryuken.queues).must_equal %w[test drive]
_(config.shoryuken.track_busy_jobs).must_equal true

report = ::Judoscale::Report.new(Judoscale.adapters, Judoscale::Config.instance, [])
_(report.as_json[:config]).must_include(:shoryuken)
end
end
end
148 changes: 148 additions & 0 deletions judoscale-shoryuken/test/metrics_collector_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# frozen_string_literal: true

require "test_helper"
require "judoscale/shoryuken/metrics_collector"

module Judoscale
ShoryukenQueueStub = Struct.new(:name, :latency, :size, keyword_init: true)

describe Shoryuken::MetricsCollector do
subject { Shoryuken::MetricsCollector.new }

def stub_sqs(queue_depths)
# Given a queue name, Shoryuken will fetch the queue URL first, then use that to
# request the queue attributes. These stubs translate to those individual calls.
::Shoryuken.sqs_client.tap { |sqs|
sqs.stub_responses(:get_queue_url, ->(context) {
{queue_url: "https://sqs.us-east-1.amazonaws.com/12345/#{context.params[:queue_name]}"}
})
sqs.stub_responses(:get_queue_attributes, ->(context) {
depth = queue_depths.fetch(context.params[:queue_url].split("/").last)
{attributes: {"ApproximateNumberOfMessages" => depth.to_s}}
})
}
end

describe "#collect" do
after {
subject.clear_queues
}

it "collects queue depth for each queue (latency is not available yet)" do
stub_sqs "default" => 10, "high" => 5

metrics = ::Shoryuken.stub(:ungrouped_queues, %w[default high]) {
subject.collect
}

_(metrics.size).must_equal 2
_(metrics[0].value).must_equal 10
_(metrics[0].queue_name).must_equal "default"
_(metrics[0].identifier).must_equal :qd
_(metrics[1].value).must_equal 5
_(metrics[1].queue_name).must_equal "high"
_(metrics[1].identifier).must_equal :qd
end

it "always collects for known queues" do
stub_sqs "default" => 0, "high" => 1

metrics = ::Shoryuken.stub(:ungrouped_queues, []) {
subject.collect
}

_(metrics).must_be :empty?

metrics = ::Shoryuken.stub(:ungrouped_queues, %w[default]) {
subject.collect
}

_(metrics.size).must_equal 1
_(metrics.map(&:queue_name)).must_equal %w[default]

metrics = ::Shoryuken.stub(:ungrouped_queues, %w[high]) {
subject.collect
}

_(metrics.size).must_equal 2
_(metrics.map(&:queue_name)).must_equal %w[default high]
end

it "logs debug information for each queue being collected" do
use_config log_level: :debug do
stub_sqs "default" => 10

::Shoryuken.stub(:ungrouped_queues, %w[default]) {
subject.collect
}

_(log_string).must_match %r{shoryuken-qd.default=10}
_(log_string).wont_match %r{shoryuken-qt}
_(log_string).wont_match %r{shoryuken-busy}
end
end

it "tracks busy jobs when the configuration is enabled" do
skip "busy jobs not available yet."
end

it "logs debug information about busy jobs being collected" do
skip "busy jobs not available yet."
end

it "filters queues matching UUID format by default, to prevent reporting for dynamically generated queues" do
queues = %W[low-#{SecureRandom.uuid} default #{SecureRandom.uuid}-high]
stub_sqs queues.map.with_index.to_h

metrics = ::Shoryuken.stub(:ungrouped_queues, queues) {
subject.collect
}

_(metrics.size).must_equal 1
_(metrics.map(&:queue_name)).must_equal %w[default]
end

it "filters queues to collect metrics from based on the configured queue filter proc, overriding the default UUID filter" do
use_adapter_config :shoryuken, queue_filter: ->(queue_name) { queue_name.start_with? "low" } do
queues = %W[low default high low-#{SecureRandom.uuid}]
stub_sqs queues.map.with_index.to_h

metrics = ::Shoryuken.stub(:ungrouped_queues, queues) {
subject.collect
}

_(metrics.size).must_equal 2
_(metrics[0].queue_name).must_equal "low"
_(metrics[1].queue_name).must_be :start_with?, "low-"
end
end

it "collects metrics only from the configured queues if the configuration is present, ignoring the queue filter" do
use_adapter_config :shoryuken, queues: %w[low ultra], queue_filter: ->(queue_name) { queue_name != "low" } do
queues = %w[low default high]
stub_sqs "low" => 5, "ultra" => 10

metrics = ::Shoryuken.stub(:ungrouped_queues, queues) {
subject.collect
}

_(metrics.map(&:queue_name)).must_equal %w[low ultra]
end
end

it "collects metrics up to the configured number of max queues, sorting by length of the queue name" do
use_adapter_config :shoryuken, max_queues: 2 do
queues = %w[low default high]
stub_sqs queues.map.with_index.to_h

metrics = ::Shoryuken.stub(:ungrouped_queues, queues) {
subject.collect
}

_(metrics.map(&:queue_name)).must_equal %w[low high]
_(log_string).must_match %r{Shoryuken metrics reporting only 2 queues max, skipping the rest \(1\)}
end
end
end
end
end
17 changes: 17 additions & 0 deletions judoscale-shoryuken/test/test_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

$LOAD_PATH.unshift File.expand_path("../lib", __dir__)
require "judoscale-shoryuken"

require "minitest/autorun"
require "minitest/spec"

module Judoscale::Test
end

Dir[File.expand_path("../../judoscale-ruby/test/support/*.rb", __dir__)].sort.each { |file| require file }

Minitest::Test.include(Judoscale::Test)

# Setup Shoryuken with a stub SQS Client to avoid any API hits, and facilitate testing.
::Shoryuken.sqs_client = Aws::SQS::Client.new(stub_responses: true)

0 comments on commit 4b71d0d

Please sign in to comment.