Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Refactor consumer settings. (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
TreyE authored and mkorszun committed Jan 24, 2020
1 parent 15aef50 commit 823a71c
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 32 deletions.
59 changes: 34 additions & 25 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule GenRMQ.Consumer do

require Logger
alias GenRMQ.Message
alias GenRMQ.Consumer.QueueConfiguration

##############################################################################
# GenRMQ.Consumer callbacks
Expand Down Expand Up @@ -218,9 +219,11 @@ defmodule GenRMQ.Consumer do
Process.flag(:trap_exit, true)
config = apply(module, :init, [])

parsed_config = parse_config(config)

state =
initial_state
|> Map.put(:config, config)
|> Map.put(:config, parsed_config)
|> Map.put(:reconnect_attempt, 0)

send(self(), :init)
Expand Down Expand Up @@ -312,6 +315,23 @@ defmodule GenRMQ.Consumer do
# Helpers
##############################################################################

defp parse_config(config) do
q_name = Keyword.fetch!(config, :queue)
{ttl_val, config_no_q_ttl} = Keyword.pop(config, :queue_ttl, nil)
{mp_val, config_no_q_mp} = Keyword.pop(config_no_q_ttl, :queue_max_priority, nil)

queue_settings =
QueueConfiguration.new(
q_name,
true,
ttl_val,
mp_val
)

config_no_q_mp
|> Keyword.put(:queue, queue_settings)
end

defp handle_message(payload, attributes, %{module: module} = state, false) do
message = Message.create(attributes, payload, state)
apply(module, :handle_message, [message])
Expand Down Expand Up @@ -369,22 +389,18 @@ defmodule GenRMQ.Consumer do
end

defp setup_consumer(%{in: chan, config: config, module: module} = state) do
queue = config[:queue]
queue_config = config[:queue]
queue = QueueConfiguration.name(queue_config)
exchange = config[:exchange]
routing_key = config[:routing_key]
prefetch_count = String.to_integer(config[:prefetch_count])
ttl = config[:queue_ttl]
max_priority = config[:queue_max_priority]

deadletter_args = setup_deadletter(chan, config)

arguments =
deadletter_args
|> setup_ttl(ttl)
|> setup_priority(max_priority)
arguments = QueueConfiguration.build_queue_arguments(queue_config, deadletter_args)

Basic.qos(chan, prefetch_count: prefetch_count)
Queue.declare(chan, queue, durable: true, arguments: arguments)
Queue.declare(chan, queue, durable: QueueConfiguration.durable(queue_config), arguments: arguments)
GenRMQ.Binding.bind_exchange_and_queue(chan, exchange, queue, routing_key)

consumer_tag = apply(module, :consumer_tag, [])
Expand All @@ -395,14 +411,20 @@ defmodule GenRMQ.Consumer do
defp setup_deadletter(chan, config) do
case Keyword.get(config, :deadletter, true) do
true ->
ttl = config[:queue_ttl]
queue = config[:queue]
queue_config = config[:queue]
queue = QueueConfiguration.name(queue_config)
exchange = GenRMQ.Binding.exchange_name(config[:exchange])
dl_queue = config[:deadletter_queue] || "#{queue}_error"
dl_exchange = config[:deadletter_exchange] || "#{exchange}.deadletter"
dl_routing_key = config[:deadletter_routing_key] || "#"

Queue.declare(chan, dl_queue, durable: true, arguments: setup_ttl([], ttl))
Queue.declare(
chan,
dl_queue,
durable: true,
arguments: QueueConfiguration.build_ttl_arguments(queue_config, [])
)

GenRMQ.Binding.bind_exchange_and_queue(chan, dl_exchange, dl_queue, dl_routing_key)

[{"x-dead-letter-exchange", :longstr, dl_exchange}] ++
Expand All @@ -427,19 +449,6 @@ defmodule GenRMQ.Consumer do

defp linear_delay(attempt), do: :timer.sleep(attempt * 1_000)

defp setup_ttl(arguments, nil), do: arguments
defp setup_ttl(arguments, ttl), do: [{"x-expires", :long, ttl} | arguments]

@max_priority 255

defp setup_priority(arguments, max_priority) when is_integer(max_priority) and max_priority <= @max_priority,
do: [{"x-max-priority", :long, max_priority} | arguments]

defp setup_priority(arguments, max_priority) when is_integer(max_priority) and max_priority > @max_priority,
do: [{"x-max-priority", :long, @max_priority} | arguments]

defp setup_priority(arguments, _), do: arguments

##############################################################################
##############################################################################
##############################################################################
Expand Down
96 changes: 96 additions & 0 deletions lib/gen_rmq/consumer/queue_configuration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule GenRMQ.Consumer.QueueConfiguration do
@moduledoc """
Represents configuration of a Consumer queue.
While this module exists to make management of Consumer queue configurations
easier, right now it should be considered a private implementation detail
with respect to the consumer configuration API.
"""

defstruct name: nil,
ttl: nil,
max_priority: nil,
durable: true

@type t :: %__MODULE__{
name: String.t(),
ttl: nil | pos_integer,
max_priority: nil | pos_integer,
durable: boolean
}

@type queue_options ::
[]
| [durable: boolean]
| [durable: boolean, max_priority: pos_integer]
| [durable: boolean, ttl: pos_integer]
| [max_priority: pos_integer]
| [max_priority: pos_integer, ttl: pos_integer]
| [durable: boolean, max_priority: pos_integer, ttl: pos_integer]

@spec new(String.t(), queue_options) :: t
def new(name, args \\ []) do
queue_ttl = Keyword.get(args, :ttl, nil)
queue_mp = Keyword.get(args, :max_priority, nil)
durable = Keyword.get(args, :durable, true)

%__MODULE__{
name: name,
ttl: queue_ttl,
max_priority: set_max_priority_to_highest_value(queue_mp),
durable: durable
}
end

@spec new(String.t(), boolean, nil | pos_integer, nil | pos_integer) :: t
def new(name, durable, ttl, mp) do
%__MODULE__{
name: name,
ttl: ttl,
max_priority: set_max_priority_to_highest_value(mp),
durable: durable
}
end

@spec name(t) :: String.t()
def name(%__MODULE__{name: n}), do: n

@spec durable(t) :: boolean
def durable(%__MODULE__{durable: d}), do: d

@spec ttl(t) :: nil | pos_integer
def ttl(%__MODULE__{ttl: ttl_v}), do: ttl_v

@spec max_priority(t) :: nil | pos_integer
def max_priority(%__MODULE__{max_priority: mp}), do: mp

def build_queue_arguments(%__MODULE__{} = qc, arguments) do
args_with_priority = setup_priority(arguments, qc.max_priority)

qc
|> build_ttl_arguments(args_with_priority)
end

def build_ttl_arguments(%__MODULE__{} = qc, arguments) do
setup_ttl(arguments, qc.ttl)
end

defp setup_ttl(arguments, nil), do: arguments
defp setup_ttl(arguments, ttl), do: [{"x-expires", :long, ttl} | arguments]

defp setup_priority(arguments, max_priority) when is_integer(max_priority),
do: [{"x-max-priority", :long, max_priority} | arguments]

defp setup_priority(arguments, _), do: arguments

@max_priority 255

defp set_max_priority_to_highest_value(nil), do: nil

defp set_max_priority_to_highest_value(mp)
when is_integer(mp) and mp > @max_priority do
255
end

defp set_max_priority_to_highest_value(mp) when is_integer(mp), do: mp
end
59 changes: 59 additions & 0 deletions test/gen_rmq/consumer/queue_configuration_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule GenRMQ.Consumer.QueueConfigurationTest do
use ExUnit.Case, async: true
alias GenRMQ.Consumer.QueueConfiguration

test "may be built with just a queue name" do
qc = QueueConfiguration.new("some_queue_name")
assert "some_queue_name" == QueueConfiguration.name(qc)
end

test "durable should default to true" do
qc = QueueConfiguration.new("some_queue_name")
assert QueueConfiguration.durable(qc)
end

test "may be built with all options" do
name = "some_queue_name"
ttl = 5000
durable = false
max_priority = 200

qc =
QueueConfiguration.new(
name,
durable,
ttl,
max_priority
)

assert name == QueueConfiguration.name(qc)
assert durable == QueueConfiguration.durable(qc)
assert ttl == QueueConfiguration.ttl(qc)
assert max_priority == QueueConfiguration.max_priority(qc)
end

test "sets max_priority values that are too large to the max" do
qc = QueueConfiguration.new("some_queue_name", max_priority: 500)
assert 255 == QueueConfiguration.max_priority(qc)
end

test "builds empty arguments when neither ttl or max_priority are provided" do
qc = QueueConfiguration.new("some_queue_name")
assert [] == QueueConfiguration.build_queue_arguments(qc, [])
end

test "builds correct arguments when ttl and max_priority are provided" do
ttl = 5000
max_priority = 5

qc =
QueueConfiguration.new(
"some_queue_name",
ttl: ttl,
max_priority: max_priority
)

assert [{"x-expires", :long, ttl}, {"x-max-priority", :long, max_priority}] ==
QueueConfiguration.build_queue_arguments(qc, [])
end
end
6 changes: 3 additions & 3 deletions test/gen_rmq_consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ defmodule GenRMQ.ConsumerTest do

Assert.repeatedly(fn ->
assert Process.alive?(consumer_pid) == true
assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}") == {:ok, 0}
assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:error, :not_found}
assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}") == {:ok, 0}
assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}_error") == {:error, :not_found}
end)
end
end
Expand Down Expand Up @@ -281,7 +281,7 @@ defmodule GenRMQ.ConsumerTest do
publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message), "routing_key_1")

Assert.repeatedly(fn ->
assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:ok, 1}
assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}_error") == {:ok, 1}
end)
end

Expand Down
8 changes: 4 additions & 4 deletions test/support/consumer_shared_tests.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule ConsumerSharedTests do
publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message))

GenRMQ.Test.Assert.repeatedly(fn ->
assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:ok, 1}
assert queue_count(context[:rabbit_conn], "#{state.config[:queue].name}_error") == {:ok, 1}
end)
end
end
Expand All @@ -49,7 +49,7 @@ defmodule ConsumerSharedTests do
defmacro terminate_after_queue_deletion_test do
quote do
test "should terminate after queue deletion", %{consumer: consumer_pid, state: state} do
AMQP.Queue.delete(state.out, state[:config][:queue])
AMQP.Queue.delete(state.out, state[:config][:queue].name)

GenRMQ.Test.Assert.repeatedly(fn ->
assert Process.alive?(consumer_pid) == false
Expand All @@ -61,7 +61,7 @@ defmodule ConsumerSharedTests do
defmacro exit_signal_after_queue_deletion_test do
quote do
test "should send exit signal after queue deletion", %{consumer: consumer_pid, state: state} do
AMQP.Queue.delete(state.out, state[:config][:queue])
AMQP.Queue.delete(state.out, state[:config][:queue].name)

assert_receive({:EXIT, ^consumer_pid, :cancelled})
end
Expand All @@ -71,7 +71,7 @@ defmodule ConsumerSharedTests do
defmacro close_connection_and_channels_after_deletion_test do
quote do
test "should close connection and channels after queue deletion", %{state: state} do
AMQP.Queue.delete(state.out, state[:config][:queue])
AMQP.Queue.delete(state.out, state[:config][:queue].name)

GenRMQ.Test.Assert.repeatedly(fn ->
assert Process.alive?(state.conn.pid) == false
Expand Down

0 comments on commit 823a71c

Please sign in to comment.