From 823a71cf0659505c2c24fffb8ac1004fd390afe9 Mon Sep 17 00:00:00 2001 From: Trey Date: Fri, 24 Jan 2020 04:06:06 -0500 Subject: [PATCH] Refactor consumer settings. (#135) --- lib/consumer.ex | 59 +++++++----- lib/gen_rmq/consumer/queue_configuration.ex | 96 +++++++++++++++++++ .../consumer/queue_configuration_test.exs | 59 ++++++++++++ test/gen_rmq_consumer_test.exs | 6 +- test/support/consumer_shared_tests.ex | 8 +- 5 files changed, 196 insertions(+), 32 deletions(-) create mode 100644 lib/gen_rmq/consumer/queue_configuration.ex create mode 100644 test/gen_rmq/consumer/queue_configuration_test.exs diff --git a/lib/consumer.ex b/lib/consumer.ex index 38ed306..389f2f9 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -15,6 +15,7 @@ defmodule GenRMQ.Consumer do require Logger alias GenRMQ.Message + alias GenRMQ.Consumer.QueueConfiguration ############################################################################## # GenRMQ.Consumer callbacks @@ -218,9 +219,11 @@ defmodule GenRMQ.Consumer do Process.flag(:trap_exit, true) config = apply(module, :init, []) + parsed_config = parse_config(config) + state = initial_state - |> Map.put(:config, config) + |> Map.put(:config, parsed_config) |> Map.put(:reconnect_attempt, 0) send(self(), :init) @@ -312,6 +315,23 @@ defmodule GenRMQ.Consumer do # Helpers ############################################################################## + defp parse_config(config) do + q_name = Keyword.fetch!(config, :queue) + {ttl_val, config_no_q_ttl} = Keyword.pop(config, :queue_ttl, nil) + {mp_val, config_no_q_mp} = Keyword.pop(config_no_q_ttl, :queue_max_priority, nil) + + queue_settings = + QueueConfiguration.new( + q_name, + true, + ttl_val, + mp_val + ) + + config_no_q_mp + |> Keyword.put(:queue, queue_settings) + end + defp handle_message(payload, attributes, %{module: module} = state, false) do message = Message.create(attributes, payload, state) apply(module, :handle_message, [message]) @@ -369,22 +389,18 @@ defmodule GenRMQ.Consumer do end defp setup_consumer(%{in: chan, config: config, module: module} = state) do - queue = config[:queue] + queue_config = config[:queue] + queue = QueueConfiguration.name(queue_config) exchange = config[:exchange] routing_key = config[:routing_key] prefetch_count = String.to_integer(config[:prefetch_count]) - ttl = config[:queue_ttl] - max_priority = config[:queue_max_priority] deadletter_args = setup_deadletter(chan, config) - arguments = - deadletter_args - |> setup_ttl(ttl) - |> setup_priority(max_priority) + arguments = QueueConfiguration.build_queue_arguments(queue_config, deadletter_args) Basic.qos(chan, prefetch_count: prefetch_count) - Queue.declare(chan, queue, durable: true, arguments: arguments) + Queue.declare(chan, queue, durable: QueueConfiguration.durable(queue_config), arguments: arguments) GenRMQ.Binding.bind_exchange_and_queue(chan, exchange, queue, routing_key) consumer_tag = apply(module, :consumer_tag, []) @@ -395,14 +411,20 @@ defmodule GenRMQ.Consumer do defp setup_deadletter(chan, config) do case Keyword.get(config, :deadletter, true) do true -> - ttl = config[:queue_ttl] - queue = config[:queue] + queue_config = config[:queue] + queue = QueueConfiguration.name(queue_config) exchange = GenRMQ.Binding.exchange_name(config[:exchange]) dl_queue = config[:deadletter_queue] || "#{queue}_error" dl_exchange = config[:deadletter_exchange] || "#{exchange}.deadletter" dl_routing_key = config[:deadletter_routing_key] || "#" - Queue.declare(chan, dl_queue, durable: true, arguments: setup_ttl([], ttl)) + Queue.declare( + chan, + dl_queue, + durable: true, + arguments: QueueConfiguration.build_ttl_arguments(queue_config, []) + ) + GenRMQ.Binding.bind_exchange_and_queue(chan, dl_exchange, dl_queue, dl_routing_key) [{"x-dead-letter-exchange", :longstr, dl_exchange}] ++ @@ -427,19 +449,6 @@ defmodule GenRMQ.Consumer do defp linear_delay(attempt), do: :timer.sleep(attempt * 1_000) - defp setup_ttl(arguments, nil), do: arguments - defp setup_ttl(arguments, ttl), do: [{"x-expires", :long, ttl} | arguments] - - @max_priority 255 - - defp setup_priority(arguments, max_priority) when is_integer(max_priority) and max_priority <= @max_priority, - do: [{"x-max-priority", :long, max_priority} | arguments] - - defp setup_priority(arguments, max_priority) when is_integer(max_priority) and max_priority > @max_priority, - do: [{"x-max-priority", :long, @max_priority} | arguments] - - defp setup_priority(arguments, _), do: arguments - ############################################################################## ############################################################################## ############################################################################## diff --git a/lib/gen_rmq/consumer/queue_configuration.ex b/lib/gen_rmq/consumer/queue_configuration.ex new file mode 100644 index 0000000..f45f6a6 --- /dev/null +++ b/lib/gen_rmq/consumer/queue_configuration.ex @@ -0,0 +1,96 @@ +defmodule GenRMQ.Consumer.QueueConfiguration do + @moduledoc """ + Represents configuration of a Consumer queue. + + While this module exists to make management of Consumer queue configurations + easier, right now it should be considered a private implementation detail + with respect to the consumer configuration API. + """ + + defstruct name: nil, + ttl: nil, + max_priority: nil, + durable: true + + @type t :: %__MODULE__{ + name: String.t(), + ttl: nil | pos_integer, + max_priority: nil | pos_integer, + durable: boolean + } + + @type queue_options :: + [] + | [durable: boolean] + | [durable: boolean, max_priority: pos_integer] + | [durable: boolean, ttl: pos_integer] + | [max_priority: pos_integer] + | [max_priority: pos_integer, ttl: pos_integer] + | [durable: boolean, max_priority: pos_integer, ttl: pos_integer] + + @spec new(String.t(), queue_options) :: t + def new(name, args \\ []) do + queue_ttl = Keyword.get(args, :ttl, nil) + queue_mp = Keyword.get(args, :max_priority, nil) + durable = Keyword.get(args, :durable, true) + + %__MODULE__{ + name: name, + ttl: queue_ttl, + max_priority: set_max_priority_to_highest_value(queue_mp), + durable: durable + } + end + + @spec new(String.t(), boolean, nil | pos_integer, nil | pos_integer) :: t + def new(name, durable, ttl, mp) do + %__MODULE__{ + name: name, + ttl: ttl, + max_priority: set_max_priority_to_highest_value(mp), + durable: durable + } + end + + @spec name(t) :: String.t() + def name(%__MODULE__{name: n}), do: n + + @spec durable(t) :: boolean + def durable(%__MODULE__{durable: d}), do: d + + @spec ttl(t) :: nil | pos_integer + def ttl(%__MODULE__{ttl: ttl_v}), do: ttl_v + + @spec max_priority(t) :: nil | pos_integer + def max_priority(%__MODULE__{max_priority: mp}), do: mp + + def build_queue_arguments(%__MODULE__{} = qc, arguments) do + args_with_priority = setup_priority(arguments, qc.max_priority) + + qc + |> build_ttl_arguments(args_with_priority) + end + + def build_ttl_arguments(%__MODULE__{} = qc, arguments) do + setup_ttl(arguments, qc.ttl) + end + + defp setup_ttl(arguments, nil), do: arguments + defp setup_ttl(arguments, ttl), do: [{"x-expires", :long, ttl} | arguments] + + defp setup_priority(arguments, max_priority) when is_integer(max_priority), + do: [{"x-max-priority", :long, max_priority} | arguments] + + defp setup_priority(arguments, _), do: arguments + + @max_priority 255 + + defp set_max_priority_to_highest_value(nil), do: nil + + defp set_max_priority_to_highest_value(mp) + when is_integer(mp) and mp > @max_priority do + 255 + end + + defp set_max_priority_to_highest_value(mp) when is_integer(mp), do: mp +end diff --git a/test/gen_rmq/consumer/queue_configuration_test.exs b/test/gen_rmq/consumer/queue_configuration_test.exs new file mode 100644 index 0000000..bfacbf2 --- /dev/null +++ b/test/gen_rmq/consumer/queue_configuration_test.exs @@ -0,0 +1,59 @@ +defmodule GenRMQ.Consumer.QueueConfigurationTest do + use ExUnit.Case, async: true + alias GenRMQ.Consumer.QueueConfiguration + + test "may be built with just a queue name" do + qc = QueueConfiguration.new("some_queue_name") + assert "some_queue_name" == QueueConfiguration.name(qc) + end + + test "durable should default to true" do + qc = QueueConfiguration.new("some_queue_name") + assert QueueConfiguration.durable(qc) + end + + test "may be built with all options" do + name = "some_queue_name" + ttl = 5000 + durable = false + max_priority = 200 + + qc = + QueueConfiguration.new( + name, + durable, + ttl, + max_priority + ) + + assert name == QueueConfiguration.name(qc) + assert durable == QueueConfiguration.durable(qc) + assert ttl == QueueConfiguration.ttl(qc) + assert max_priority == QueueConfiguration.max_priority(qc) + end + + test "sets max_priority values that are too large to the max" do + qc = QueueConfiguration.new("some_queue_name", max_priority: 500) + assert 255 == QueueConfiguration.max_priority(qc) + end + + test "builds empty arguments when neither ttl or max_priority are provided" do + qc = QueueConfiguration.new("some_queue_name") + assert [] == QueueConfiguration.build_queue_arguments(qc, []) + end + + test "builds correct arguments when ttl and max_priority are provided" do + ttl = 5000 + max_priority = 5 + + qc = + QueueConfiguration.new( + "some_queue_name", + ttl: ttl, + max_priority: max_priority + ) + + assert [{"x-expires", :long, ttl}, {"x-max-priority", :long, max_priority}] == + QueueConfiguration.build_queue_arguments(qc, []) + end +end diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 1632dcf..8cc7887 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -129,8 +129,8 @@ defmodule GenRMQ.ConsumerTest do Assert.repeatedly(fn -> assert Process.alive?(consumer_pid) == true - assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}") == {:ok, 0} - assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:error, :not_found} + assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}") == {:ok, 0} + assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}_error") == {:error, :not_found} end) end end @@ -281,7 +281,7 @@ defmodule GenRMQ.ConsumerTest do publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message), "routing_key_1") Assert.repeatedly(fn -> - assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:ok, 1} + assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}_error") == {:ok, 1} end) end diff --git a/test/support/consumer_shared_tests.ex b/test/support/consumer_shared_tests.ex index bcce7bf..43aa4df 100644 --- a/test/support/consumer_shared_tests.ex +++ b/test/support/consumer_shared_tests.ex @@ -25,7 +25,7 @@ defmodule ConsumerSharedTests do publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) GenRMQ.Test.Assert.repeatedly(fn -> - assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:ok, 1} + assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}_error") == {:ok, 1} end) end end @@ -49,7 +49,7 @@ defmodule ConsumerSharedTests do defmacro terminate_after_queue_deletion_test do quote do test "should terminate after queue deletion", %{consumer: consumer_pid, state: state} do - AMQP.Queue.delete(state.out, state[:config][:queue]) + AMQP.Queue.delete(state.out, state[:config][:queue].name) GenRMQ.Test.Assert.repeatedly(fn -> assert Process.alive?(consumer_pid) == false @@ -61,7 +61,7 @@ defmodule ConsumerSharedTests do defmacro exit_signal_after_queue_deletion_test do quote do test "should send exit signal after queue deletion", %{consumer: consumer_pid, state: state} do - AMQP.Queue.delete(state.out, state[:config][:queue]) + AMQP.Queue.delete(state.out, state[:config][:queue].name) assert_receive({:EXIT, ^consumer_pid, :cancelled}) end @@ -71,7 +71,7 @@ defmodule ConsumerSharedTests do defmacro close_connection_and_channels_after_deletion_test do quote do test "should close connection and channels after queue deletion", %{state: state} do - AMQP.Queue.delete(state.out, state[:config][:queue]) + AMQP.Queue.delete(state.out, state[:config][:queue].name) GenRMQ.Test.Assert.repeatedly(fn -> assert Process.alive?(state.conn.pid) == false