From ccc03672cf5c885d26fc49dd5f72129c4474f474 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Sun, 26 Apr 2020 14:36:56 -0400 Subject: [PATCH 01/18] Initial attempt at using a Task Supervisor for message consumers --- lib/consumer.ex | 75 ++++++++++++++++++++++++++++++++---------------- lib/publisher.ex | 21 +++++++------- 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index a9e1d7b..c15b0bd 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -263,29 +263,28 @@ defmodule GenRMQ.Consumer do |> Map.put(:config, parsed_config) |> Map.put(:reconnect_attempt, 0) - send(self(), :init) - - {:ok, state} - end - - @doc false - @impl GenServer - def handle_call({:recover, requeue}, _from, %{in: channel} = state) do - {:reply, Basic.recover(channel, requeue: requeue), state} + {:ok, state, {:continue, :init}} end @doc false @impl GenServer - def handle_info(:init, state) do + def handle_continue(:init, state) do state = state |> get_connection() |> open_channels() |> setup_consumer() + |> setup_task_supervisor() {:noreply, state} end + @doc false + @impl GenServer + def handle_call({:recover, requeue}, _from, %{in: channel} = state) do + {:reply, Basic.recover(channel, requeue: requeue), state} + end + @doc false @impl GenServer def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do @@ -298,6 +297,20 @@ defmodule GenRMQ.Consumer do |> handle_reconnect(state) end + @doc false + @impl GenServer + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + {:noreply, state} + end + + @doc false + @impl GenServer + def handle_info({ref, _task_result}, state) when is_reference(ref) do + Process.demonitor(ref, [:flush]) + + {:noreply, state} + end + @doc false @impl GenServer def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do @@ -321,7 +334,7 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer - def handle_info({:basic_deliver, payload, attributes}, %{module: module, config: config} = state) do + def handle_info({:basic_deliver, payload, attributes}, %{module: module} = state) do %{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}") @@ -329,7 +342,7 @@ defmodule GenRMQ.Consumer do Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}") end - handle_message(payload, attributes, state, Keyword.get(config, :concurrency, true)) + handle_message(payload, attributes, state) {:noreply, state} end @@ -374,19 +387,9 @@ defmodule GenRMQ.Consumer do |> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri])) end - defp handle_message(payload, attributes, %{module: module} = state, false) do - start_time = System.monotonic_time() - message = Message.create(attributes, payload, state) - - emit_message_start_event(start_time, message, module) - result = apply(module, :handle_message, [message]) - emit_message_stop_event(start_time, message, module) - - result - end - - defp handle_message(payload, attributes, %{module: module} = state, true) do - spawn(fn -> + defp handle_message(payload, attributes, %{module: module, task_supervisor: task_supervisor_pid} = state) + when is_pid(task_supervisor_pid) do + Task.Supervisor.async_nolink(task_supervisor_pid, fn -> start_time = System.monotonic_time() message = Message.create(attributes, payload, state) @@ -398,6 +401,17 @@ defmodule GenRMQ.Consumer do end) end + defp handle_message(payload, attributes, %{module: module} = state) do + start_time = System.monotonic_time() + message = Message.create(attributes, payload, state) + + emit_message_start_event(start_time, message, module) + result = apply(module, :handle_message, [message]) + emit_message_stop_event(start_time, message, module) + + result + end + defp handle_reconnect(false, %{module: module} = state) do Logger.info("[#{module}]: Reconnection is disabled. Terminating consumer.") {:stop, :connection_closed, state} @@ -410,6 +424,7 @@ defmodule GenRMQ.Consumer do |> get_connection() |> open_channels() |> setup_consumer() + |> setup_task_supervisor() {:noreply, new_state} end @@ -452,6 +467,16 @@ defmodule GenRMQ.Consumer do Map.merge(state, %{in: chan, out: out_chan}) end + defp setup_task_supervisor(%{config: config} = state) do + if Keyword.get(config, :concurrency, true) do + {:ok, pid} = Task.Supervisor.start_link() + + Map.put(state, :task_supervisor, pid) + else + Map.put(state, :task_supervisor, nil) + end + end + defp setup_consumer(%{in: chan, config: config, module: module} = state) do queue_config = config[:queue] prefetch_count = String.to_integer(config[:prefetch_count]) diff --git a/lib/publisher.ex b/lib/publisher.ex index 22cafb9..e5ab512 100644 --- a/lib/publisher.ex +++ b/lib/publisher.ex @@ -214,8 +214,17 @@ defmodule GenRMQ.Publisher do Process.flag(:trap_exit, true) config = apply(module, :init, []) state = Map.merge(initial_state, %{config: config}) - send(self(), :init) - {:ok, state} + + {:ok, state, {:continue, :init}} + end + + @doc false + @impl GenServer + def handle_continue(:init, %{module: module, config: config}) do + Logger.info("[#{module}]: Setting up publisher connection and configuration") + {:ok, state} = setup_publisher(%{module: module, config: config}) + + {:noreply, state} end @doc false @@ -279,14 +288,6 @@ defmodule GenRMQ.Publisher do {:reply, result, state} end - @doc false - @impl GenServer - def handle_info(:init, %{module: module, config: config}) do - Logger.info("[#{module}]: Setting up publisher connection and configuration") - {:ok, state} = setup_publisher(%{module: module, config: config}) - {:noreply, state} - end - @doc false @impl GenServer def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config}) do From 994625f582a6719ac9e047390ebcce96dbf35924 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Sun, 26 Apr 2020 15:30:33 -0400 Subject: [PATCH 02/18] Adjusting testing travis matrix --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 0c80678..28173f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,9 +3,9 @@ elixir: - 1.7 - 1.8 - 1.9 + - 1.10 otp_release: - - 20.0 - 21.0 sudo: required From c360540701b0ebf9a2e2f936ecbea7198c94e2ee Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Tue, 28 Apr 2020 09:33:37 -0400 Subject: [PATCH 03/18] Remove unnecessary task supervisor start --- lib/consumer.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index c15b0bd..2e2e3a6 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -424,7 +424,6 @@ defmodule GenRMQ.Consumer do |> get_connection() |> open_channels() |> setup_consumer() - |> setup_task_supervisor() {:noreply, new_state} end From db0b2aa3095f115b43931d2eaaefe8358c48e182 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Mon, 4 May 2020 15:29:09 -0400 Subject: [PATCH 04/18] Added telemetry event for failed task and adjusted :DOWN function to act on running tasks --- .../guides/consumer/telemetry_events.md | 5 ++ lib/consumer.ex | 59 ++++++++++++++----- test/gen_rmq_consumer_test.exs | 44 +++++++++++++- test/support/test_consumers.ex | 38 ++++++++++-- 4 files changed, 124 insertions(+), 22 deletions(-) diff --git a/documentation/guides/consumer/telemetry_events.md b/documentation/guides/consumer/telemetry_events.md index c92b432..4bd1063 100644 --- a/documentation/guides/consumer/telemetry_events.md +++ b/documentation/guides/consumer/telemetry_events.md @@ -42,4 +42,9 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t - Measurement: `%{time: System.monotonic_time}` - Metadata: `%{module: atom, reason: atom}` +- `[:gen_rmq, :consumer, :task, :down]` - Dispatched by a GenRMQ consumer when a supervised Task fails to process a message + + - Measurement: `%{time: System.monotonic_time}` + - Metadata: `%{module: atom, reason: tuple}` + [telemetry]: https://github.com/beam-telemetry/telemetry diff --git a/lib/consumer.ex b/lib/consumer.ex index 2e2e3a6..218caea 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -262,6 +262,7 @@ defmodule GenRMQ.Consumer do initial_state |> Map.put(:config, parsed_config) |> Map.put(:reconnect_attempt, 0) + |> Map.put(:running_tasks, MapSet.new()) {:ok, state, {:continue, :init}} end @@ -287,28 +288,42 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer - def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do - Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}") + def handle_info( + {:DOWN, ref, :process, _pid, reason}, + %{module: module, config: config, running_tasks: running_tasks} = state + ) do + if MapSet.member?(running_tasks, ref) do + Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}") - emit_connection_down_event(module, reason) + emit_task_down_event(module, reason) - config - |> Keyword.get(:reconnect, true) - |> handle_reconnect(state) - end + updated_state = %{state | running_tasks: MapSet.delete(running_tasks, ref)} - @doc false - @impl GenServer - def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do - {:noreply, state} + {:noreply, updated_state} + else + Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}") + + emit_connection_down_event(module, reason) + + config + |> Keyword.get(:reconnect, true) + |> handle_reconnect(state) + end end @doc false @impl GenServer - def handle_info({ref, _task_result}, state) when is_reference(ref) do + def handle_info({ref, _task_result}, %{running_tasks: running_tasks} = state) when is_reference(ref) do Process.demonitor(ref, [:flush]) - {:noreply, state} + updated_state = + if MapSet.member?(running_tasks, ref) do + %{state | running_tasks: MapSet.delete(running_tasks, ref)} + else + state + end + + {:noreply, updated_state} end @doc false @@ -334,7 +349,7 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer - def handle_info({:basic_deliver, payload, attributes}, %{module: module} = state) do + def handle_info({:basic_deliver, payload, attributes}, %{module: module, running_tasks: running_tasks} = state) do %{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}") @@ -342,9 +357,13 @@ defmodule GenRMQ.Consumer do Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}") end - handle_message(payload, attributes, state) + updated_state = + case handle_message(payload, attributes, state) do + %Task{ref: ref} -> %{state | running_tasks: MapSet.put(running_tasks, ref)} + _ -> state + end - {:noreply, state} + {:noreply, updated_state} end @doc false @@ -531,6 +550,14 @@ defmodule GenRMQ.Consumer do :telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata) end + defp emit_task_down_event(module, reason) do + start_time = System.monotonic_time() + measurements = %{time: start_time} + metadata = %{module: module, reason: reason} + + :telemetry.execute([:gen_rmq, :consumer, :task, :down], measurements, metadata) + end + defp emit_connection_down_event(module, reason) do start_time = System.monotonic_time() measurements = %{time: start_time} diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 1e6260a..8c9acee 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -1,11 +1,12 @@ defmodule GenRMQ.ConsumerTest do use ExUnit.Case, async: false use GenRMQ.RabbitCase + import ConsumerSharedTests alias GenRMQ.Test.Assert - alias GenRMQ.Consumer + alias TestConsumer.Default alias TestConsumer.WithQueueOptions alias TestConsumer.WithCustomDeadletter @@ -18,6 +19,7 @@ defmodule GenRMQ.ConsumerTest do alias TestConsumer.WithFanoutExchange alias TestConsumer.WithMultiBindingExchange alias TestConsumer.RedeclaringExistingExchange + alias TestConsumer.ErrorInConsumer @connection "amqp://guest:guest@localhost:5672" @@ -91,6 +93,43 @@ defmodule GenRMQ.ConsumerTest do close_connection_and_channels_after_shutdown_test() end + describe "TestConsumer.ErrorInConsumer" do + setup :attach_telemetry_handlers + + setup do + Agent.start_link(fn -> MapSet.new() end, name: ErrorInConsumer) + with_test_consumer(ErrorInConsumer) + end + + test "should invoke the consumer's handle_info callback if error exists", + %{consumer: consumer_pid, state: state} = context do + message = %{"value" => 0} + + publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} + end) + + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :down], %{time: _}, %{reason: _, module: _}} + end + + test "should not invoke the consumer's handle_info callback if error does not exist", + %{consumer: consumer_pid, state: state} = context do + message = %{"value" => 1} + + publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} + end) + + refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :down], %{time: _}, %{reason: _, module: _}} + end + end + describe "TestConsumer.WithoutConcurrency" do setup do Agent.start_link(fn -> MapSet.new() end, name: WithoutConcurrency) @@ -380,7 +419,8 @@ defmodule GenRMQ.ConsumerTest do [:gen_rmq, :consumer, :message, :start], [:gen_rmq, :consumer, :message, :stop], [:gen_rmq, :consumer, :connection, :start], - [:gen_rmq, :consumer, :connection, :stop] + [:gen_rmq, :consumer, :connection, :stop], + [:gen_rmq, :consumer, :task, :down] ], fn name, measurements, metadata, _ -> send(self, {:telemetry_event, name, measurements, metadata}) diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 1c20664..428d953 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -117,7 +117,7 @@ defmodule TestConsumer do queue_options: [ durable: false, arguments: [ - {"x-expires", :long, 1000}, + {"x-expires", :long, 1000} ] ], exchange: "gen_rmq_in_exchange_queue_options", @@ -128,11 +128,11 @@ defmodule TestConsumer do deadletter_queue_options: [ durable: false, arguments: [ - {"x-expires", :long, 1000}, + {"x-expires", :long, 1000} ] ], deadletter_exchange: "dl_exchange_options", - deadletter_routing_key: "dl_routing_key_options", + deadletter_routing_key: "dl_routing_key_options" ] end @@ -165,7 +165,7 @@ defmodule TestConsumer do queue_ttl: 1000, deadletter_queue: "dl_queue", deadletter_exchange: "dl_exchange", - deadletter_routing_key: "dl_routing_key", + deadletter_routing_key: "dl_routing_key" ] end @@ -330,6 +330,7 @@ defmodule TestConsumer do @behaviour GenRMQ.Consumer def existing_exchange, do: "existing_direct_exchange" + def init() do [ queue: "gen_rmq_in_queue_" <> existing_exchange(), @@ -346,4 +347,33 @@ defmodule TestConsumer do def handle_message(_), do: :ok end + + defmodule ErrorInConsumer do + @moduledoc false + @behaviour GenRMQ.Consumer + + def init() do + [ + queue: "gen_rmq_in_queue", + exchange: "gen_rmq_in_exchange", + routing_key: "#", + prefetch_count: "10", + connection: "amqp://guest:guest@localhost:5672", + queue_ttl: 1000 + ] + end + + def consumer_tag() do + "TestConsumer.ErrorInConsumer" + end + + def handle_message(message) do + %{"value" => value} = Jason.decode!(message.payload) + + result = Float.to_string(1 / value) + updated_message = Map.put(message, :payload, result) + + GenRMQ.Consumer.ack(updated_message) + end + end end From d5698e55d13201dc38f42d6fdf2330f2152fd178 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Tue, 12 May 2020 23:39:13 -0400 Subject: [PATCH 05/18] Added ability to yield for running tasks during GenServer termination --- lib/consumer.ex | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index 218caea..3bbfb33 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -262,7 +262,7 @@ defmodule GenRMQ.Consumer do initial_state |> Map.put(:config, parsed_config) |> Map.put(:reconnect_attempt, 0) - |> Map.put(:running_tasks, MapSet.new()) + |> Map.put(:running_tasks, %{}) {:ok, state, {:continue, :init}} end @@ -292,12 +292,12 @@ defmodule GenRMQ.Consumer do {:DOWN, ref, :process, _pid, reason}, %{module: module, config: config, running_tasks: running_tasks} = state ) do - if MapSet.member?(running_tasks, ref) do + if Map.has_key?(running_tasks, ref) do Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}") - emit_task_down_event(module, reason) + emit_task_error_event(module, reason) - updated_state = %{state | running_tasks: MapSet.delete(running_tasks, ref)} + updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)} {:noreply, updated_state} else @@ -317,8 +317,8 @@ defmodule GenRMQ.Consumer do Process.demonitor(ref, [:flush]) updated_state = - if MapSet.member?(running_tasks, ref) do - %{state | running_tasks: MapSet.delete(running_tasks, ref)} + if Map.has_key?(running_tasks, ref) do + %{state | running_tasks: Map.delete(running_tasks, ref)} else state end @@ -359,7 +359,7 @@ defmodule GenRMQ.Consumer do updated_state = case handle_message(payload, attributes, state) do - %Task{ref: ref} -> %{state | running_tasks: MapSet.put(running_tasks, ref)} + %Task{ref: ref} = task -> %{state | running_tasks: Map.put(running_tasks, ref, task)} _ -> state end @@ -368,36 +368,46 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer - def terminate(:connection_closed = reason, %{module: module}) do + def terminate(:connection_closed = reason, %{module: module} = state) do # Since connection has been closed no need to clean it up Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}") + await_running_tasks(state) end @doc false @impl GenServer - def terminate(reason, %{module: module, conn: conn, in: in_chan, out: out_chan}) do + def terminate(reason, %{module: module, conn: conn, in: in_chan, out: out_chan} = state) do Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}") Channel.close(in_chan) Channel.close(out_chan) Connection.close(conn) + await_running_tasks(state) end @doc false @impl GenServer - def terminate({{:shutdown, {:server_initiated_close, error_code, reason}}, _}, %{module: module}) do + def terminate({{:shutdown, {:server_initiated_close, error_code, reason}}, _}, %{module: module} = state) do Logger.error("[#{module}]: Terminating consumer, error_code: #{inspect(error_code)}, reason: #{inspect(reason)}") + await_running_tasks(state) end @doc false @impl GenServer - def terminate(reason, %{module: module}) do + def terminate(reason, %{module: module} = state) do Logger.error("[#{module}]: Terminating consumer, unexpected reason: #{inspect(reason)}") + await_running_tasks(state) end ############################################################################## # Helpers ############################################################################## + defp await_running_tasks(%{running_tasks: running_tasks}) do + running_tasks + |> Map.values() + |> Task.yield_many() + end + defp parse_config(config) do queue_name = Keyword.fetch!(config, :queue) @@ -550,12 +560,12 @@ defmodule GenRMQ.Consumer do :telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata) end - defp emit_task_down_event(module, reason) do + defp emit_task_error_event(module, reason) do start_time = System.monotonic_time() measurements = %{time: start_time} metadata = %{module: module, reason: reason} - :telemetry.execute([:gen_rmq, :consumer, :task, :down], measurements, metadata) + :telemetry.execute([:gen_rmq, :consumer, :task, :error], measurements, metadata) end defp emit_connection_down_event(module, reason) do From fb685b8a990a03adde053c94e91e815e8a1eaa67 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Wed, 13 May 2020 15:03:50 -0400 Subject: [PATCH 06/18] Added tests for Task Supervisor await --- lib/consumer.ex | 12 ++++-- test/gen_rmq_consumer_test.exs | 70 ++++++++++++++++++++++++++-------- test/support/test_consumers.ex | 31 +++++++++++++++ 3 files changed, 93 insertions(+), 20 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index 3bbfb33..91ef6b7 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -369,33 +369,37 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer def terminate(:connection_closed = reason, %{module: module} = state) do + await_running_tasks(state) + # Since connection has been closed no need to clean it up Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}") - await_running_tasks(state) end @doc false @impl GenServer def terminate(reason, %{module: module, conn: conn, in: in_chan, out: out_chan} = state) do + await_running_tasks(state) + Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}") Channel.close(in_chan) Channel.close(out_chan) Connection.close(conn) - await_running_tasks(state) end @doc false @impl GenServer def terminate({{:shutdown, {:server_initiated_close, error_code, reason}}, _}, %{module: module} = state) do - Logger.error("[#{module}]: Terminating consumer, error_code: #{inspect(error_code)}, reason: #{inspect(reason)}") await_running_tasks(state) + + Logger.error("[#{module}]: Terminating consumer, error_code: #{inspect(error_code)}, reason: #{inspect(reason)}") end @doc false @impl GenServer def terminate(reason, %{module: module} = state) do - Logger.error("[#{module}]: Terminating consumer, unexpected reason: #{inspect(reason)}") await_running_tasks(state) + + Logger.error("[#{module}]: Terminating consumer, unexpected reason: #{inspect(reason)}") end ############################################################################## diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 8c9acee..719cfbb 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -7,19 +7,22 @@ defmodule GenRMQ.ConsumerTest do alias GenRMQ.Test.Assert alias GenRMQ.Consumer - alias TestConsumer.Default - alias TestConsumer.WithQueueOptions - alias TestConsumer.WithCustomDeadletter - alias TestConsumer.WithoutConcurrency - alias TestConsumer.WithoutDeadletter - alias TestConsumer.WithoutReconnection - alias TestConsumer.WithPriority - alias TestConsumer.WithTopicExchange - alias TestConsumer.WithDirectExchange - alias TestConsumer.WithFanoutExchange - alias TestConsumer.WithMultiBindingExchange - alias TestConsumer.RedeclaringExistingExchange - alias TestConsumer.ErrorInConsumer + alias TestConsumer.{ + Default, + ErrorInConsumer, + RedeclaringExistingExchange, + SlowConsumer, + WithCustomDeadletter, + WithDirectExchange, + WithFanoutExchange, + WithMultiBindingExchange, + WithPriority, + WithQueueOptions, + WithTopicExchange, + WithoutConcurrency, + WithoutDeadletter, + WithoutReconnection + } @connection "amqp://guest:guest@localhost:5672" @@ -112,7 +115,7 @@ defmodule GenRMQ.ConsumerTest do assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} end) - assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :down], %{time: _}, %{reason: _, module: _}} + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} end test "should not invoke the consumer's handle_info callback if error does not exist", @@ -126,7 +129,42 @@ defmodule GenRMQ.ConsumerTest do assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} end) - refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :down], %{time: _}, %{reason: _, module: _}} + refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} + end + end + + describe "TestConsumer.SlowConsumer" do + setup :attach_telemetry_handlers + + setup do + Agent.start_link(fn -> MapSet.new() end, name: SlowConsumer) + with_test_consumer(SlowConsumer) + end + + test "should wait for the in progress tasks to complete processing before terminating consumer", + %{consumer: consumer_pid, state: state} = context do + message = %{"value" => 1} + + publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} + end) + + GenServer.stop(consumer_pid) + + refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} + + assert_receive( + {:telemetry_event, [:gen_rmq, :consumer, :message, :start], %{time: _}, %{message: _, module: _}}, + 1_000 + ) + + assert_receive( + {:telemetry_event, [:gen_rmq, :consumer, :message, :stop], %{time: _, duration: _}, %{message: _, module: _}}, + 1_000 + ) end end @@ -420,7 +458,7 @@ defmodule GenRMQ.ConsumerTest do [:gen_rmq, :consumer, :message, :stop], [:gen_rmq, :consumer, :connection, :start], [:gen_rmq, :consumer, :connection, :stop], - [:gen_rmq, :consumer, :task, :down] + [:gen_rmq, :consumer, :task, :error] ], fn name, measurements, metadata, _ -> send(self, {:telemetry_event, name, measurements, metadata}) diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 428d953..b1c4790 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -376,4 +376,35 @@ defmodule TestConsumer do GenRMQ.Consumer.ack(updated_message) end end + + defmodule SlowConsumer do + @moduledoc false + @behaviour GenRMQ.Consumer + + def init() do + [ + queue: "gen_rmq_in_queue", + exchange: "gen_rmq_in_exchange", + routing_key: "#", + prefetch_count: "10", + connection: "amqp://guest:guest@localhost:5672", + queue_ttl: 1000 + ] + end + + def consumer_tag() do + "TestConsumer.SlowConsumer" + end + + def handle_message(message) do + %{"value" => value} = Jason.decode!(message.payload) + + Process.sleep(500) + + result = Float.to_string(1 / value) + updated_message = Map.put(message, :payload, result) + + GenRMQ.Consumer.ack(updated_message) + end + end end From 2f3313e0259fb0c49f0a6a23ece50cce110c46c7 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Wed, 13 May 2020 15:08:28 -0400 Subject: [PATCH 07/18] Fixing credo error --- test/gen_rmq_publisher_test.exs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/gen_rmq_publisher_test.exs b/test/gen_rmq_publisher_test.exs index 911f34c..ac4b5e1 100644 --- a/test/gen_rmq_publisher_test.exs +++ b/test/gen_rmq_publisher_test.exs @@ -5,9 +5,11 @@ defmodule GenRMQ.PublisherTest do alias GenRMQ.Publisher alias GenRMQ.Test.Assert - alias TestPublisher.Default - alias TestPublisher.WithConfirmations - alias TestPublisher.RedeclaringExistingExchange + alias TestPublisher.{ + Default, + RedeclaringExistingExchange, + WithConfirmations + } @connection "amqp://guest:guest@localhost:5672" @exchange "gen_rmq_out_exchange" From 0996fb5d2e432a554287b10fc6c18cb9ad556a85 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Wed, 13 May 2020 15:39:21 -0400 Subject: [PATCH 08/18] Fixing lingering process mailbox test error --- test/gen_rmq_consumer_test.exs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 719cfbb..56ed0d3 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -106,6 +106,7 @@ defmodule GenRMQ.ConsumerTest do test "should invoke the consumer's handle_info callback if error exists", %{consumer: consumer_pid, state: state} = context do + clear_mailbox() message = %{"value" => 0} publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) @@ -120,6 +121,7 @@ defmodule GenRMQ.ConsumerTest do test "should not invoke the consumer's handle_info callback if error does not exist", %{consumer: consumer_pid, state: state} = context do + clear_mailbox() message = %{"value" => 1} publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) @@ -477,4 +479,12 @@ defmodule GenRMQ.ConsumerTest do on_exit(fn -> Process.exit(consumer_pid, :normal) end) {:ok, %{consumer: consumer_pid, exchange: exchange, state: state}} end + + defp clear_mailbox do + receive do + _lingering_message -> clear_mailbox() + after + 100 -> :ok + end + end end From 4eb7ef1ea09bf55a82257e88adfb409a0d181fb6 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Wed, 13 May 2020 16:01:06 -0400 Subject: [PATCH 09/18] Fixing documentation --- documentation/guides/consumer/telemetry_events.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/guides/consumer/telemetry_events.md b/documentation/guides/consumer/telemetry_events.md index 4bd1063..1c6bdb3 100644 --- a/documentation/guides/consumer/telemetry_events.md +++ b/documentation/guides/consumer/telemetry_events.md @@ -42,7 +42,7 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t - Measurement: `%{time: System.monotonic_time}` - Metadata: `%{module: atom, reason: atom}` -- `[:gen_rmq, :consumer, :task, :down]` - Dispatched by a GenRMQ consumer when a supervised Task fails to process a message +- `[:gen_rmq, :consumer, :task, :error]` - Dispatched by a GenRMQ consumer when a supervised Task fails to process a message - Measurement: `%{time: System.monotonic_time}` - Metadata: `%{module: atom, reason: tuple}` From 9ce91841fad55ed66629b736f5263306f559f4a2 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Thu, 14 May 2020 20:59:59 -0400 Subject: [PATCH 10/18] Added ability to configure termination timeout --- lib/consumer.ex | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index 91ef6b7..6f071b6 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -59,12 +59,16 @@ defmodule GenRMQ.Consumer do `queue_max_priority` - defines if a declared queue should be a priority queue. Should be set to a value from `1..255` range. If it is greater than `255`, queue max priority will be set to `255`. Values between `1` and `10` are - [recommened](https://www.rabbitmq.com/priority.html#resource-usage). + [recommended](https://www.rabbitmq.com/priority.html#resource-usage). `concurrency` - defines if `handle_message` callback is called - in seperate process using [spawn](https://hexdocs.pm/elixir/Process.html#spawn/2) + in separate process using [spawn](https://hexdocs.pm/elixir/Process.html#spawn/2) function. By default concurrency is enabled. To disable, set it to `false` + `terminate_timeout` - defines how long the consumer will wait for in-flight Tasks to + complete before terminating the process. The value is in milliseconds and the default + is 5_000 milliseconds. + `retry_delay_function` - custom retry delay function. Called when the connection to the broker cannot be established. Receives the connection attempt as an argument (>= 1) and is expected to wait for some time. @@ -107,7 +111,7 @@ defmodule GenRMQ.Consumer do prefetch_count: "10", uri: "amqp://guest:guest@localhost:5672", concurrency: true, - queue_ttl: 5000, + queue_ttl: 5_000, retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end, reconnect: true, deadletter: true, @@ -134,6 +138,7 @@ defmodule GenRMQ.Consumer do prefetch_count: String.t(), uri: String.t(), concurrency: boolean, + terminate_timeout: integer, queue_ttl: integer, retry_delay_function: function, reconnect: boolean, @@ -257,12 +262,14 @@ defmodule GenRMQ.Consumer do Process.flag(:trap_exit, true) config = apply(module, :init, []) parsed_config = parse_config(config) + terminate_timeout = Keyword.get(parsed_config, :terminate_timeout, 5_000) state = initial_state |> Map.put(:config, parsed_config) |> Map.put(:reconnect_attempt, 0) |> Map.put(:running_tasks, %{}) + |> Map.put(:terminate_timeout, terminate_timeout) {:ok, state, {:continue, :init}} end @@ -406,10 +413,10 @@ defmodule GenRMQ.Consumer do # Helpers ############################################################################## - defp await_running_tasks(%{running_tasks: running_tasks}) do + defp await_running_tasks(%{running_tasks: running_tasks, terminate_timeout: terminate_timeout}) do running_tasks |> Map.values() - |> Task.yield_many() + |> Task.yield_many(terminate_timeout) end defp parse_config(config) do From 7d016ca8295aefc3acb4155d059d227976c5b828 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Thu, 14 May 2020 21:07:29 -0400 Subject: [PATCH 11/18] Minor documentation tweaks --- README.md | 18 +++++++----------- lib/consumer.ex | 1 + 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 8e41993..792dbef 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ This will result in: - durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter` - durable topic `gen_rmq_exchange` exchange created or redeclared - durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter` -- every `handle_message` callback will executed in separate process. This can be disabled by setting `concurrency: false` in `init` callback +- every `handle_message` callback will executed in separate supervised Task. This can be disabled by setting `concurrency: false` in `init` callback - on failed rabbitmq connection it will wait for a bit and then reconnect There are many options to control the consumer setup details, please check the `c:GenRMQ.Consumer.init/0` [docs][consumer_doc] for all available settings. @@ -138,7 +138,7 @@ Are you using GenRMQ in Production? Please let us know, we are curious to learn ## Maintainers -* Mateusz ([@mkorszun](https://github.com/mkorszun)) +- Mateusz ([@mkorszun](https://github.com/mkorszun)) The maintainers are responsible for the general project oversight, and empowering further trusted committers (see below). @@ -146,14 +146,14 @@ The maintainers are the ones that create new releases of GenRMQ. ## Trusted Committers -* Joel ([@vorce](https://github.com/vorce)) -* Sebastian ([@spier](https://github.com/spier)) -* [@Shemeikka](https://github.com/Shemeikka) -* Alexander ([@akoutmos](https://github.com/akoutmos)) +- Joel ([@vorce](https://github.com/vorce)) +- Sebastian ([@spier](https://github.com/spier)) +- [@Shemeikka](https://github.com/Shemeikka) +- Alexander ([@akoutmos](https://github.com/akoutmos)) Trusted Committers are members of our community who we have explicitly added to our GitHub repository. Trusted Committers have elevated rights, allowing them to send in changes directly to branches and to approve Pull Requests. For details see [TRUSTED-COMMITTERS.md][trusted_commiters]. -*Note:* Maintainers and Trusted Committers are listed in [.github/CODEOWNERS][code_owners] in order to automatically assign PR reviews to them. +_Note:_ Maintainers and Trusted Committers are listed in [.github/CODEOWNERS][code_owners] in order to automatically assign PR reviews to them. ## License @@ -170,23 +170,19 @@ Copyright (c) 2018 - 2020 Meltwater Inc. [underthehood.meltwater.com][undertheho [gen_rmq_issues]: https://github.com/meltwater/gen_rmq/issues [priority_queues]: https://www.rabbitmq.com/priority.html [underthehood]: http://underthehood.meltwater.com/ - [examples]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples [example_consumer]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples/consumer.ex [example_publisher]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples/publisher.ex [example_processor]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples/processor.ex [example_rabbit_case]: https://github.com/meltwater/gen_rmq/blob/master/test/gen_rmq_publisher_test.exs - [guide_consumer_basic_setup]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/basic_setup.md [guide_consumer_with_custom_deadletter_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_deadletter_configuration.md [guide_consumer_with_custom_exchange_type]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_exchange_type.md [guide_consumer_with_custom_queue_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_queue_configuration.md [without_deadletter_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/without_deadletter_configuration.md [with_quorum_queue_type]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_quorum_queue_type.md - [consumer_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/telemetry_events.md [publisher_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/publisher/telemetry_events.md - [trusted_commiters]: https://github.com/meltwater/gen_rmq/blob/master/TRUSTED-COMMITTERS.md [code_owners]: https://github.com/meltwater/gen_rmq/blob/master/.github/CODEOWNERS [license]: https://github.com/meltwater/gen_rmq/blob/master/LICENSE diff --git a/lib/consumer.ex b/lib/consumer.ex index 6f071b6..e98a2a5 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -111,6 +111,7 @@ defmodule GenRMQ.Consumer do prefetch_count: "10", uri: "amqp://guest:guest@localhost:5672", concurrency: true, + terminate_timeout: 5_000, queue_ttl: 5_000, retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end, reconnect: true, From 6a28687c361902df95e7f17f5de7c3ab8f5b2e97 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Fri, 15 May 2020 14:04:40 -0400 Subject: [PATCH 12/18] Make error tests clearer --- test/gen_rmq_consumer_test.exs | 4 ++++ test/support/test_consumers.ex | 2 ++ 2 files changed, 6 insertions(+) diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index d872655..4d57655 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -108,6 +108,8 @@ defmodule GenRMQ.ConsumerTest do test "should invoke the consumer's handle_info callback if error exists", %{consumer: consumer_pid, state: state} = context do clear_mailbox() + + # Pass in a value that will raise message = %{"value" => 0} publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) @@ -123,6 +125,8 @@ defmodule GenRMQ.ConsumerTest do test "should not invoke the consumer's handle_info callback if error does not exist", %{consumer: consumer_pid, state: state} = context do clear_mailbox() + + # Pass in a value that will not raise message = %{"value" => 1} publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 7ba9885..72e1f7d 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -401,6 +401,8 @@ defmodule TestConsumer do def handle_message(message) do %{"value" => value} = Jason.decode!(message.payload) + if value == 0, do: raise("Can't divide by zero!") + result = Float.to_string(1 / value) updated_message = Map.put(message, :payload, result) From 59b3e91a01fc7db97dfb2f9b38d97c0ac511bdff Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Sun, 17 May 2020 12:01:53 -0400 Subject: [PATCH 13/18] Fixed README docs --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 792dbef..2ccdf4d 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ This will result in: - durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter` - durable topic `gen_rmq_exchange` exchange created or redeclared - durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter` -- every `handle_message` callback will executed in separate supervised Task. This can be disabled by setting `concurrency: false` in `init` callback +- every `handle_message` callback will be executed in a separate supervised Task. This can be disabled by setting `concurrency: false` in `init` callback - on failed rabbitmq connection it will wait for a bit and then reconnect There are many options to control the consumer setup details, please check the `c:GenRMQ.Consumer.init/0` [docs][consumer_doc] for all available settings. From 0b8298c5d589d38eaceceda5552ca212c5c3d4ac Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Tue, 19 May 2020 23:19:02 -0400 Subject: [PATCH 14/18] Added ability to timeout slow message handler tasks --- lib/consumer.ex | 96 ++++++++++++++++++++++++---------- test/gen_rmq_consumer_test.exs | 25 ++++++++- test/support/test_consumers.ex | 7 +-- 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index e98a2a5..026e17a 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -69,6 +69,10 @@ defmodule GenRMQ.Consumer do complete before terminating the process. The value is in milliseconds and the default is 5_000 milliseconds. + `handle_message_timeout` - defines how long the `handle_message` callback will execute + within a supervised task. The value is in milliseconds and the default is 5_000 + milliseconds. + `retry_delay_function` - custom retry delay function. Called when the connection to the broker cannot be established. Receives the connection attempt as an argument (>= 1) and is expected to wait for some time. @@ -112,6 +116,7 @@ defmodule GenRMQ.Consumer do uri: "amqp://guest:guest@localhost:5672", concurrency: true, terminate_timeout: 5_000, + handle_message_timeout: 5_000, queue_ttl: 5_000, retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end, reconnect: true, @@ -140,6 +145,7 @@ defmodule GenRMQ.Consumer do uri: String.t(), concurrency: boolean, terminate_timeout: integer, + handle_message_timeout: integer, queue_ttl: integer, retry_delay_function: function, reconnect: boolean, @@ -264,6 +270,7 @@ defmodule GenRMQ.Consumer do config = apply(module, :init, []) parsed_config = parse_config(config) terminate_timeout = Keyword.get(parsed_config, :terminate_timeout, 5_000) + handle_message_timeout = Keyword.get(parsed_config, :handle_message_timeout, 5_000) state = initial_state @@ -271,6 +278,7 @@ defmodule GenRMQ.Consumer do |> Map.put(:reconnect_attempt, 0) |> Map.put(:running_tasks, %{}) |> Map.put(:terminate_timeout, terminate_timeout) + |> Map.put(:handle_message_timeout, handle_message_timeout) {:ok, state, {:continue, :init}} end @@ -300,40 +308,57 @@ defmodule GenRMQ.Consumer do {:DOWN, ref, :process, _pid, reason}, %{module: module, config: config, running_tasks: running_tasks} = state ) do - if Map.has_key?(running_tasks, ref) do - Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}") + case Map.get(running_tasks, ref) do + {%Task{}, timeout_reference} -> + Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}") - emit_task_error_event(module, reason) + Process.cancel_timer(timeout_reference) + updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)} + emit_task_error_event(module, reason) - updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)} + {:noreply, updated_state} - {:noreply, updated_state} - else - Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}") + _ -> + Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}") - emit_connection_down_event(module, reason) + emit_connection_down_event(module, reason) - config - |> Keyword.get(:reconnect, true) - |> handle_reconnect(state) + config + |> Keyword.get(:reconnect, true) + |> handle_reconnect(state) end end @doc false @impl GenServer def handle_info({ref, _task_result}, %{running_tasks: running_tasks} = state) when is_reference(ref) do + # Task completed sucessfully, update the running task map and state Process.demonitor(ref, [:flush]) updated_state = - if Map.has_key?(running_tasks, ref) do - %{state | running_tasks: Map.delete(running_tasks, ref)} - else - state + case Map.get(running_tasks, ref) do + {%Task{}, timeout_reference} -> + Process.cancel_timer(timeout_reference) + %{state | running_tasks: Map.delete(running_tasks, ref)} + + _ -> + state end {:noreply, updated_state} end + @doc false + @impl GenServer + def handle_info({:kill, task_reference}, %{running_tasks: running_tasks} = state) when is_reference(task_reference) do + # The task has timed out, kill the Task process which will trigger a :DOWN event that + # is handled by a previous `handle_info/2` callback + {%Task{pid: pid}, _timeout_reference} = Map.get(running_tasks, task_reference) + Process.exit(pid, :kill) + + {:noreply, state} + end + @doc false @impl GenServer def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do @@ -357,7 +382,10 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer - def handle_info({:basic_deliver, payload, attributes}, %{module: module, running_tasks: running_tasks} = state) do + def handle_info( + {:basic_deliver, payload, attributes}, + %{module: module, running_tasks: running_tasks, handle_message_timeout: handle_message_timeout} = state + ) do %{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}") @@ -367,8 +395,12 @@ defmodule GenRMQ.Consumer do updated_state = case handle_message(payload, attributes, state) do - %Task{ref: ref} = task -> %{state | running_tasks: Map.put(running_tasks, ref, task)} - _ -> state + %Task{ref: task_reference} = task -> + timeout_reference = Process.send_after(self(), {:kill, task_reference}, handle_message_timeout) + %{state | running_tasks: Map.put(running_tasks, task_reference, {task, timeout_reference})} + + _ -> + state end {:noreply, updated_state} @@ -415,8 +447,14 @@ defmodule GenRMQ.Consumer do ############################################################################## defp await_running_tasks(%{running_tasks: running_tasks, terminate_timeout: terminate_timeout}) do + # Await for all in-flight tasks for the configured amount of time and cancel + # their individual timeout timers running_tasks |> Map.values() + |> Enum.map(fn {%Task{} = task, timeout_reference} -> + Process.cancel_timer(timeout_reference) + task + end) |> Task.yield_many(terminate_timeout) end @@ -430,16 +468,20 @@ defmodule GenRMQ.Consumer do defp handle_message(payload, attributes, %{module: module, task_supervisor: task_supervisor_pid} = state) when is_pid(task_supervisor_pid) do - Task.Supervisor.async_nolink(task_supervisor_pid, fn -> - start_time = System.monotonic_time() - message = Message.create(attributes, payload, state) + Task.Supervisor.async_nolink( + task_supervisor_pid, + fn -> + start_time = System.monotonic_time() + message = Message.create(attributes, payload, state) - emit_message_start_event(start_time, message, module) - result = apply(module, :handle_message, [message]) - emit_message_stop_event(start_time, message, module) + emit_message_start_event(start_time, message, module) + result = apply(module, :handle_message, [message]) + emit_message_stop_event(start_time, message, module) - result - end) + result + end, + shutdown: :brutal_kill + ) end defp handle_message(payload, attributes, %{module: module} = state) do @@ -509,7 +551,7 @@ defmodule GenRMQ.Consumer do defp setup_task_supervisor(%{config: config} = state) do if Keyword.get(config, :concurrency, true) do - {:ok, pid} = Task.Supervisor.start_link() + {:ok, pid} = Task.Supervisor.start_link(max_restarts: 0) Map.put(state, :task_supervisor, pid) else diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 4d57655..f955a6a 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -150,7 +150,8 @@ defmodule GenRMQ.ConsumerTest do test "should wait for the in progress tasks to complete processing before terminating consumer", %{consumer: consumer_pid, state: state} = context do - message = %{"value" => 1} + # Time to sleep consumer + message = %{"value" => 500} publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) @@ -173,6 +174,28 @@ defmodule GenRMQ.ConsumerTest do 1_000 ) end + + test "should error out the task if it takes too long", + %{consumer: consumer_pid, state: state} = context do + # Time to sleep consumer + message = %{"value" => 1_250} + + publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} + end) + + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :start], %{time: _}, %{message: _, module: _}} + + refute_receive( + {:telemetry_event, [:gen_rmq, :consumer, :message, :stop], %{time: _, duration: _}, %{message: _, module: _}}, + 2_000 + ) + + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: :killed, module: _}} + end end describe "TestConsumer.WithoutConcurrency" do diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 72e1f7d..6793897 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -421,7 +421,8 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1000, + handle_message_timeout: 1_000 ] end @@ -432,11 +433,11 @@ defmodule TestConsumer do def handle_message(message) do %{"value" => value} = Jason.decode!(message.payload) - Process.sleep(500) - result = Float.to_string(1 / value) updated_message = Map.put(message, :payload, result) + Process.sleep(value) + GenRMQ.Consumer.ack(updated_message) end end From 073078c6784b345ff9233a17a5a175e8d240221a Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Fri, 22 May 2020 22:40:59 -0400 Subject: [PATCH 15/18] Added handle_error callback along with tests --- lib/consumer.ex | 66 ++++++++++++++++++++-------- lib/message_task.ex | 25 +++++++++++ test/gen_rmq_consumer_test.exs | 19 +++++--- test/support/test_consumers.ex | 79 ++++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+), 24 deletions(-) create mode 100644 lib/message_task.ex diff --git a/lib/consumer.ex b/lib/consumer.ex index 026e17a..1e8f7fd 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -8,13 +8,14 @@ defmodule GenRMQ.Consumer do * create deadletter queue and exchange * handle reconnections * call `handle_message` callback on every message delivery + * call `handle_error` callback whenever `handle_message` fails to process or times out """ use GenServer use AMQP require Logger - alias GenRMQ.Message + alias GenRMQ.{Message, MessageTask} alias GenRMQ.Consumer.QueueConfiguration ############################################################################## @@ -186,6 +187,28 @@ defmodule GenRMQ.Consumer do """ @callback handle_message(message :: %GenRMQ.Message{}) :: :ok + @doc """ + Invoked when an error or timeout is encountered while executing `handle_message` callback + + `message` - `GenRMQ.Message` struct + `reason` - atom denoting the type of error + + ## Examples: + To reject the message message that caused the Task to fail you can do something like so: + ``` + def handle_error(message, reason) do + # Do something with message and reject it + Logger.warn("Failed to process message: #\{inspect(message)}") + + GenRMQ.Consumer.reject(message) + end + + The `reason` argument will either be `:timeout` or `:error` if the message processing task + timed out or encountered and error respectively. + ``` + """ + @callback handle_error(message :: %GenRMQ.Message{}, reason :: atom()) :: :ok + ############################################################################## # GenRMQ.Consumer API ############################################################################## @@ -309,12 +332,15 @@ defmodule GenRMQ.Consumer do %{module: module, config: config, running_tasks: running_tasks} = state ) do case Map.get(running_tasks, ref) do - {%Task{}, timeout_reference} -> - Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}") + %MessageTask{exit_status: exit_status, message: message, timeout_reference: timeout_reference} -> + exit_status = exit_status || :error + Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(exit_status)}") + # Cancel timeout timer, emit telemetry event, and invoke user's `handle_error` callback Process.cancel_timer(timeout_reference) updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)} - emit_task_error_event(module, reason) + emit_task_error_event(module, exit_status) + apply(module, :handle_error, [message, exit_status]) {:noreply, updated_state} @@ -332,13 +358,13 @@ defmodule GenRMQ.Consumer do @doc false @impl GenServer def handle_info({ref, _task_result}, %{running_tasks: running_tasks} = state) when is_reference(ref) do - # Task completed sucessfully, update the running task map and state + # Task completed successfully, update the running task map and state Process.demonitor(ref, [:flush]) updated_state = case Map.get(running_tasks, ref) do - {%Task{}, timeout_reference} -> - Process.cancel_timer(timeout_reference) + %MessageTask{} = message_task -> + Process.cancel_timer(message_task.timeout_reference) %{state | running_tasks: Map.delete(running_tasks, ref)} _ -> @@ -353,10 +379,13 @@ defmodule GenRMQ.Consumer do def handle_info({:kill, task_reference}, %{running_tasks: running_tasks} = state) when is_reference(task_reference) do # The task has timed out, kill the Task process which will trigger a :DOWN event that # is handled by a previous `handle_info/2` callback - {%Task{pid: pid}, _timeout_reference} = Map.get(running_tasks, task_reference) + message_task = Map.get(running_tasks, task_reference) + %MessageTask{task: %Task{pid: pid}} = message_task + updated_state = put_in(state, [:running_tasks, task_reference], %{message_task | exit_status: :timeout}) + Process.exit(pid, :kill) - {:noreply, state} + {:noreply, updated_state} end @doc false @@ -393,11 +422,14 @@ defmodule GenRMQ.Consumer do Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}") end + message = Message.create(attributes, payload, state) + updated_state = - case handle_message(payload, attributes, state) do + case handle_message(message, state) do %Task{ref: task_reference} = task -> timeout_reference = Process.send_after(self(), {:kill, task_reference}, handle_message_timeout) - %{state | running_tasks: Map.put(running_tasks, task_reference, {task, timeout_reference})} + message_task = MessageTask.create(task, timeout_reference, message) + %{state | running_tasks: Map.put(running_tasks, task_reference, message_task)} _ -> state @@ -451,9 +483,9 @@ defmodule GenRMQ.Consumer do # their individual timeout timers running_tasks |> Map.values() - |> Enum.map(fn {%Task{} = task, timeout_reference} -> - Process.cancel_timer(timeout_reference) - task + |> Enum.map(fn %MessageTask{} = message_task -> + Process.cancel_timer(message_task.timeout_reference) + message_task.task end) |> Task.yield_many(terminate_timeout) end @@ -466,13 +498,12 @@ defmodule GenRMQ.Consumer do |> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri])) end - defp handle_message(payload, attributes, %{module: module, task_supervisor: task_supervisor_pid} = state) + defp handle_message(message, %{module: module, task_supervisor: task_supervisor_pid}) when is_pid(task_supervisor_pid) do Task.Supervisor.async_nolink( task_supervisor_pid, fn -> start_time = System.monotonic_time() - message = Message.create(attributes, payload, state) emit_message_start_event(start_time, message, module) result = apply(module, :handle_message, [message]) @@ -484,9 +515,8 @@ defmodule GenRMQ.Consumer do ) end - defp handle_message(payload, attributes, %{module: module} = state) do + defp handle_message(message, %{module: module}) do start_time = System.monotonic_time() - message = Message.create(attributes, payload, state) emit_message_start_event(start_time, message, module) result = apply(module, :handle_message, [message]) diff --git a/lib/message_task.ex b/lib/message_task.ex new file mode 100644 index 0000000..4347917 --- /dev/null +++ b/lib/message_task.ex @@ -0,0 +1,25 @@ +defmodule GenRMQ.MessageTask do + @moduledoc """ + Struct wrapping details of a Task that is executing the configured + `handle_message` callback + + Defines: + * `:task` - the Task struct executing the user's `handle_message` callback + * `:timeout_reference` - the reference to the timeout timer + * `:message` - the GenRMQ.Message struct that is being processed + * `:exit_status` - the exist status of the Task + """ + + @enforce_keys [:task, :timeout_reference, :message, :exit_status] + defstruct [:task, :timeout_reference, :message, :exit_status] + + @doc false + def create(task, timeout_reference, message) do + %__MODULE__{ + task: task, + timeout_reference: timeout_reference, + message: message, + exit_status: nil + } + end +end diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index f955a6a..4b2c2d6 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -101,11 +101,11 @@ defmodule GenRMQ.ConsumerTest do setup :attach_telemetry_handlers setup do - Agent.start_link(fn -> MapSet.new() end, name: ErrorInConsumer) + ErrorInConsumer.start_link(self()) with_test_consumer(ErrorInConsumer) end - test "should invoke the consumer's handle_info callback if error exists", + test "should invoke the user's handle_error callback if an error occurs", %{consumer: consumer_pid, state: state} = context do clear_mailbox() @@ -119,10 +119,11 @@ defmodule GenRMQ.ConsumerTest do assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} end) - assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: :error, module: _}} + assert_receive {:task_error, :error} end - test "should not invoke the consumer's handle_info callback if error does not exist", + test "should not invoke the user's handle_error callback if an error does not occur", %{consumer: consumer_pid, state: state} = context do clear_mailbox() @@ -137,6 +138,7 @@ defmodule GenRMQ.ConsumerTest do end) refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} + refute_receive {:task_error, :error} end end @@ -144,7 +146,7 @@ defmodule GenRMQ.ConsumerTest do setup :attach_telemetry_handlers setup do - Agent.start_link(fn -> MapSet.new() end, name: SlowConsumer) + SlowConsumer.start_link(self()) with_test_consumer(SlowConsumer) end @@ -173,6 +175,8 @@ defmodule GenRMQ.ConsumerTest do {:telemetry_event, [:gen_rmq, :consumer, :message, :stop], %{time: _, duration: _}, %{message: _, module: _}}, 1_000 ) + + refute_receive {:task_error, :error} end test "should error out the task if it takes too long", @@ -194,7 +198,10 @@ defmodule GenRMQ.ConsumerTest do 2_000 ) - assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: :killed, module: _}} + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, + %{reason: :timeout, module: _}} + + assert_receive {:task_error, :timeout} end end diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 6793897..0a95a4c 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -27,6 +27,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithoutConcurrency do @@ -56,6 +60,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, {payload, consuming_process})) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithoutReconnection do @@ -80,6 +88,10 @@ defmodule TestConsumer do def handle_message(_message) do end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithoutDeadletter do @@ -105,6 +117,10 @@ defmodule TestConsumer do def handle_message(message) do GenRMQ.Consumer.reject(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithQueueOptions do @@ -149,6 +165,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithCustomDeadletter do @@ -176,6 +196,10 @@ defmodule TestConsumer do def handle_message(message) do GenRMQ.Consumer.reject(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithPriority do @@ -203,6 +227,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithTopicExchange do @@ -233,6 +261,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithDirectExchange do @@ -263,6 +295,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithFanoutExchange do @@ -293,6 +329,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithMultiBindingExchange do @@ -323,6 +363,10 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule RedeclaringExistingExchange do @@ -346,6 +390,10 @@ defmodule TestConsumer do end def handle_message(_), do: :ok + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule WithQueueOptionsWithoutArguments do @@ -377,12 +425,22 @@ defmodule TestConsumer do Agent.update(__MODULE__, &MapSet.put(&1, payload)) GenRMQ.Consumer.ack(message) end + + def handle_error(message, _reason) do + GenRMQ.Consumer.reject(message) + end end defmodule ErrorInConsumer do + use Agent + @moduledoc false @behaviour GenRMQ.Consumer + def start_link(test_pid) do + Agent.start_link(fn -> test_pid end, name: __MODULE__) + end + def init() do [ queue: "gen_rmq_in_queue", @@ -408,11 +466,24 @@ defmodule TestConsumer do GenRMQ.Consumer.ack(updated_message) end + + def handle_error(message, reason) do + __MODULE__ + |> Agent.get(& &1) + |> send({:task_error, reason}) + + GenRMQ.Consumer.reject(message) + end end defmodule SlowConsumer do + use Agent + @moduledoc false @behaviour GenRMQ.Consumer + def start_link(test_pid) do + Agent.start_link(fn -> test_pid end, name: __MODULE__) + end def init() do [ @@ -440,5 +511,13 @@ defmodule TestConsumer do GenRMQ.Consumer.ack(updated_message) end + + def handle_error(message, reason) do + __MODULE__ + |> Agent.get(& &1) + |> send({:task_error, reason}) + + GenRMQ.Consumer.reject(message) + end end end From bc2397aa3ceff51347af11ee9b71a2d049588171 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Sun, 24 May 2020 21:51:00 -0400 Subject: [PATCH 16/18] Fixed flaky tests and added exception handling for non-concurrent message handling --- .../guides/consumer/telemetry_events.md | 10 ++--- lib/consumer.ex | 42 ++++++++++--------- lib/message_task.ex | 12 +++--- test/gen_rmq_consumer_test.exs | 40 +++++++++++++----- test/support/test_consumers.ex | 40 +++++++++--------- 5 files changed, 84 insertions(+), 60 deletions(-) diff --git a/documentation/guides/consumer/telemetry_events.md b/documentation/guides/consumer/telemetry_events.md index 1c6bdb3..ea2d66a 100644 --- a/documentation/guides/consumer/telemetry_events.md +++ b/documentation/guides/consumer/telemetry_events.md @@ -22,6 +22,11 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t - Measurement: `%{time: System.monotonic_time, duration: native_time}` - Metadata: `%{message: String.t, module: atom}` +- `[:gen_rmq, :consumer, :message, :error]` - Dispatched by a GenRMQ consumer when a message fails to be processed + + - Measurement: `%{time: System.monotonic_time}` + - Metadata: `%{module: atom, reason: tuple}` + - `[:gen_rmq, :consumer, :connection, :start]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started - Measurement: `%{time: System.monotonic_time}` @@ -42,9 +47,4 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t - Measurement: `%{time: System.monotonic_time}` - Metadata: `%{module: atom, reason: atom}` -- `[:gen_rmq, :consumer, :task, :error]` - Dispatched by a GenRMQ consumer when a supervised Task fails to process a message - - - Measurement: `%{time: System.monotonic_time}` - - Metadata: `%{module: atom, reason: tuple}` - [telemetry]: https://github.com/beam-telemetry/telemetry diff --git a/lib/consumer.ex b/lib/consumer.ex index 1e8f7fd..498562b 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -194,7 +194,7 @@ defmodule GenRMQ.Consumer do `reason` - atom denoting the type of error ## Examples: - To reject the message message that caused the Task to fail you can do something like so: + To reject the message that caused the Task to fail you can do something like so: ``` def handle_error(message, reason) do # Do something with message and reject it @@ -332,15 +332,14 @@ defmodule GenRMQ.Consumer do %{module: module, config: config, running_tasks: running_tasks} = state ) do case Map.get(running_tasks, ref) do - %MessageTask{exit_status: exit_status, message: message, timeout_reference: timeout_reference} -> - exit_status = exit_status || :error - Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(exit_status)}") + %MessageTask{message: message, timeout_reference: timeout_reference, start_time: start_time} -> + Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}") # Cancel timeout timer, emit telemetry event, and invoke user's `handle_error` callback Process.cancel_timer(timeout_reference) updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)} - emit_task_error_event(module, exit_status) - apply(module, :handle_error, [message, exit_status]) + emit_message_error_event(module, reason, message, start_time) + apply(module, :handle_error, [message, reason]) {:noreply, updated_state} @@ -379,13 +378,10 @@ defmodule GenRMQ.Consumer do def handle_info({:kill, task_reference}, %{running_tasks: running_tasks} = state) when is_reference(task_reference) do # The task has timed out, kill the Task process which will trigger a :DOWN event that # is handled by a previous `handle_info/2` callback - message_task = Map.get(running_tasks, task_reference) - %MessageTask{task: %Task{pid: pid}} = message_task - updated_state = put_in(state, [:running_tasks, task_reference], %{message_task | exit_status: :timeout}) - + %MessageTask{task: %Task{pid: pid}} = Map.get(running_tasks, task_reference) Process.exit(pid, :kill) - {:noreply, updated_state} + {:noreply, state} end @doc false @@ -517,12 +513,18 @@ defmodule GenRMQ.Consumer do defp handle_message(message, %{module: module}) do start_time = System.monotonic_time() - emit_message_start_event(start_time, message, module) - result = apply(module, :handle_message, [message]) - emit_message_stop_event(start_time, message, module) - result + try do + result = apply(module, :handle_message, [message]) + emit_message_stop_event(start_time, message, module) + + result + rescue + reason -> + emit_message_error_event(message, reason, message, start_time) + :error + end end defp handle_reconnect(false, %{module: module} = state) do @@ -644,12 +646,12 @@ defmodule GenRMQ.Consumer do :telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata) end - defp emit_task_error_event(module, reason) do - start_time = System.monotonic_time() - measurements = %{time: start_time} - metadata = %{module: module, reason: reason} + defp emit_message_error_event(module, reason, message, start_time) do + stop_time = System.monotonic_time() + measurements = %{time: stop_time, duration: stop_time - start_time} + metadata = %{module: module, reason: reason, message: message} - :telemetry.execute([:gen_rmq, :consumer, :task, :error], measurements, metadata) + :telemetry.execute([:gen_rmq, :consumer, :message, :error], measurements, metadata) end defp emit_connection_down_event(module, reason) do diff --git a/lib/message_task.ex b/lib/message_task.ex index 4347917..d166a1a 100644 --- a/lib/message_task.ex +++ b/lib/message_task.ex @@ -7,19 +7,21 @@ defmodule GenRMQ.MessageTask do * `:task` - the Task struct executing the user's `handle_message` callback * `:timeout_reference` - the reference to the timeout timer * `:message` - the GenRMQ.Message struct that is being processed - * `:exit_status` - the exist status of the Task + * `:start_time` - the monotonic time that the task was started """ - @enforce_keys [:task, :timeout_reference, :message, :exit_status] - defstruct [:task, :timeout_reference, :message, :exit_status] + alias __MODULE__ + + @enforce_keys [:task, :timeout_reference, :message, :start_time] + defstruct [:task, :timeout_reference, :message, :start_time] @doc false def create(task, timeout_reference, message) do - %__MODULE__{ + %MessageTask{ task: task, timeout_reference: timeout_reference, message: message, - exit_status: nil + start_time: System.monotonic_time() } end end diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 4b2c2d6..ef9e942 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -119,8 +119,19 @@ defmodule GenRMQ.ConsumerTest do assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} end) - assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: :error, module: _}} - assert_receive {:task_error, :error} + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, + %{reason: {%RuntimeError{message: "Can't divide by zero!"}, _}, module: _, message: _}} + + assert_receive {:task_error, {%RuntimeError{message: "Can't divide by zero!"}, _}} + + # Check that the message made it to the deadletter queue + dl_queue = state.config[:queue][:dead_letter][:name] + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], dl_queue) == {:ok, 1} + {:ok, _, _} = get_message_from_queue(context[:rabbit_conn], dl_queue) + end) end test "should not invoke the user's handle_error callback if an error does not occur", @@ -137,7 +148,7 @@ defmodule GenRMQ.ConsumerTest do assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} end) - refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} + refute_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _}, %{reason: _, module: _}} refute_receive {:task_error, :error} end end @@ -164,7 +175,7 @@ defmodule GenRMQ.ConsumerTest do GenServer.stop(consumer_pid) - refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}} + refute_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _}, %{reason: _, module: _}} assert_receive( {:telemetry_event, [:gen_rmq, :consumer, :message, :start], %{time: _}, %{message: _, module: _}}, @@ -182,7 +193,7 @@ defmodule GenRMQ.ConsumerTest do test "should error out the task if it takes too long", %{consumer: consumer_pid, state: state} = context do # Time to sleep consumer - message = %{"value" => 1_250} + message = %{"value" => 500} publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) @@ -195,13 +206,22 @@ defmodule GenRMQ.ConsumerTest do refute_receive( {:telemetry_event, [:gen_rmq, :consumer, :message, :stop], %{time: _, duration: _}, %{message: _, module: _}}, - 2_000 + 500 ) - assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, - %{reason: :timeout, module: _}} + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, + %{reason: :killed, module: _, message: _}} + + assert_receive {:task_error, :killed} + + # Check that the message made it to the deadletter queue + dl_queue = state.config[:queue][:dead_letter][:name] - assert_receive {:task_error, :timeout} + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], dl_queue) == {:ok, 1} + {:ok, _, _} = get_message_from_queue(context[:rabbit_conn], dl_queue) + end) end end @@ -514,7 +534,7 @@ defmodule GenRMQ.ConsumerTest do [:gen_rmq, :consumer, :message, :stop], [:gen_rmq, :consumer, :connection, :start], [:gen_rmq, :consumer, :connection, :stop], - [:gen_rmq, :consumer, :task, :error] + [:gen_rmq, :consumer, :message, :error] ], fn name, measurements, metadata, _ -> send(self, {:telemetry_event, name, measurements, metadata}) diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 0a95a4c..752be7a 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -10,7 +10,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -45,7 +45,7 @@ defmodule TestConsumer do prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", concurrency: false, - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -78,7 +78,7 @@ defmodule TestConsumer do prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", reconnect: false, - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -105,7 +105,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000, + queue_ttl: 1_000, deadletter: false ] end @@ -133,7 +133,7 @@ defmodule TestConsumer do queue_options: [ durable: false, arguments: [ - {"x-expires", :long, 1000} + {"x-expires", :long, 1_000} ] ], exchange: "gen_rmq_in_exchange_queue_options", @@ -144,7 +144,7 @@ defmodule TestConsumer do deadletter_queue_options: [ durable: false, arguments: [ - {"x-expires", :long, 1000} + {"x-expires", :long, 1_000} ] ], deadletter_exchange: "dl_exchange_options", @@ -182,7 +182,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000, + queue_ttl: 1_000, deadletter_queue: "dl_queue", deadletter_exchange: "dl_exchange", deadletter_routing_key: "dl_routing_key" @@ -213,7 +213,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000, + queue_ttl: 1_000, queue_max_priority: 100 ] end @@ -244,7 +244,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -278,7 +278,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -312,7 +312,7 @@ defmodule TestConsumer do routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -346,7 +346,7 @@ defmodule TestConsumer do routing_key: ["routing_key_1", "routing_key_2"], prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -381,7 +381,7 @@ defmodule TestConsumer do exchange: existing_exchange(), prefetch_count: "10", uri: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -443,12 +443,12 @@ defmodule TestConsumer do def init() do [ - queue: "gen_rmq_in_queue", - exchange: "gen_rmq_in_exchange", + queue: "gen_rmq_error_in_consume_queue", + exchange: "gen_rmq_error_in_consume_exchange", routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000 + queue_ttl: 1_000 ] end @@ -487,13 +487,13 @@ defmodule TestConsumer do def init() do [ - queue: "gen_rmq_in_queue", - exchange: "gen_rmq_in_exchange", + queue: "slow_consumer_queue", + exchange: "slow_consumer_exchange", routing_key: "#", prefetch_count: "10", connection: "amqp://guest:guest@localhost:5672", - queue_ttl: 1000, - handle_message_timeout: 1_000 + queue_ttl: 1_000, + handle_message_timeout: 250 ] end From e14bfdef2b4cb55a6830f1e315578e82ab14c455 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Sun, 24 May 2020 23:42:35 -0400 Subject: [PATCH 17/18] Added test for handle_error during syncronoush handle_message --- lib/consumer.ex | 3 +- test/gen_rmq_consumer_test.exs | 55 +++++++++++++++++++++++++++++++--- test/support/test_consumers.ex | 46 ++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 5 deletions(-) diff --git a/lib/consumer.ex b/lib/consumer.ex index 498562b..6ccbf20 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -522,7 +522,8 @@ defmodule GenRMQ.Consumer do result rescue reason -> - emit_message_error_event(message, reason, message, start_time) + emit_message_error_event(module, reason, message, start_time) + apply(module, :handle_error, [message, reason]) :error end end diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index ef9e942..3456a0b 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -10,6 +10,7 @@ defmodule GenRMQ.ConsumerTest do alias TestConsumer.{ Default, ErrorInConsumer, + ErrorWithoutConcurrency, RedeclaringExistingExchange, SlowConsumer, WithCustomDeadletter, @@ -120,7 +121,11 @@ defmodule GenRMQ.ConsumerTest do end) assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, - %{reason: {%RuntimeError{message: "Can't divide by zero!"}, _}, module: _, message: _}} + %{ + reason: {%RuntimeError{message: "Can't divide by zero!"}, _}, + module: ErrorInConsumer, + message: _ + }} assert_receive {:task_error, {%RuntimeError{message: "Can't divide by zero!"}, _}} @@ -210,7 +215,7 @@ defmodule GenRMQ.ConsumerTest do ) assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, - %{reason: :killed, module: _, message: _}} + %{reason: :killed, module: SlowConsumer, message: _}} assert_receive {:task_error, :killed} @@ -242,6 +247,48 @@ defmodule GenRMQ.ConsumerTest do end end + describe "TestConsumer.ErrorWithoutConcurrency" do + setup :attach_telemetry_handlers + + setup do + ErrorWithoutConcurrency.start_link(self()) + with_test_consumer(ErrorWithoutConcurrency) + end + + test "should receive invoke the handle_error callback if an error is encountered with no concurrency", + %{consumer: consumer_pid, state: state} = context do + clear_mailbox() + + # Pass in a value that will raise + message = %{"value" => 0} + + publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message)) + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0} + end) + + assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, + %{ + reason: %RuntimeError{message: "Can't divide by zero!"}, + module: ErrorWithoutConcurrency, + message: _ + }} + + assert_receive {:synchronous_error, %RuntimeError{message: "Can't divide by zero!"}} + + # Check that the message made it to the deadletter queue + dl_queue = state.config[:queue][:dead_letter][:name] + + Assert.repeatedly(fn -> + assert Process.alive?(consumer_pid) == true + assert queue_count(context[:rabbit_conn], dl_queue) == {:ok, 1} + {:ok, _, _} = get_message_from_queue(context[:rabbit_conn], dl_queue) + end) + end + end + describe "TestConsumer.WithoutReconnection" do setup do with_test_consumer(WithoutReconnection) @@ -532,9 +579,9 @@ defmodule GenRMQ.ConsumerTest do [ [:gen_rmq, :consumer, :message, :start], [:gen_rmq, :consumer, :message, :stop], + [:gen_rmq, :consumer, :message, :error], [:gen_rmq, :consumer, :connection, :start], - [:gen_rmq, :consumer, :connection, :stop], - [:gen_rmq, :consumer, :message, :error] + [:gen_rmq, :consumer, :connection, :stop] ], fn name, measurements, metadata, _ -> send(self, {:telemetry_event, name, measurements, metadata}) diff --git a/test/support/test_consumers.ex b/test/support/test_consumers.ex index 752be7a..f2a6755 100644 --- a/test/support/test_consumers.ex +++ b/test/support/test_consumers.ex @@ -66,6 +66,52 @@ defmodule TestConsumer do end end + defmodule ErrorWithoutConcurrency do + use Agent + + @moduledoc false + @behaviour GenRMQ.Consumer + + def start_link(test_pid) do + Agent.start_link(fn -> test_pid end, name: __MODULE__) + end + + def init() do + [ + queue: "error_no_concurrency_queue", + exchange: "error_no_concurrency_exchange", + routing_key: "#", + prefetch_count: "10", + connection: "amqp://guest:guest@localhost:5672", + concurrency: false, + queue_ttl: 1_000 + ] + end + + def consumer_tag() do + "TestConsumer.ErrorWithoutConcurrency" + end + + def handle_message(message) do + %{"value" => value} = Jason.decode!(message.payload) + + if value == 0, do: raise("Can't divide by zero!") + + result = Float.to_string(1 / value) + updated_message = Map.put(message, :payload, result) + + GenRMQ.Consumer.ack(updated_message) + end + + def handle_error(message, reason) do + __MODULE__ + |> Agent.get(& &1) + |> send({:synchronous_error, reason}) + + GenRMQ.Consumer.reject(message) + end + end + defmodule WithoutReconnection do @moduledoc false @behaviour GenRMQ.Consumer From 72d45ee8903f9fc70993a08718eed4a30fdd7d05 Mon Sep 17 00:00:00 2001 From: Alex Koutmos Date: Mon, 25 May 2020 13:52:51 -0400 Subject: [PATCH 18/18] Enhanced docs and tweaked synchronous handle_message error case --- .../guides/consumer/telemetry_events.md | 4 +-- lib/consumer.ex | 26 +++++++++++++++---- test/gen_rmq_consumer_test.exs | 8 +++--- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/documentation/guides/consumer/telemetry_events.md b/documentation/guides/consumer/telemetry_events.md index ea2d66a..619eb56 100644 --- a/documentation/guides/consumer/telemetry_events.md +++ b/documentation/guides/consumer/telemetry_events.md @@ -24,8 +24,8 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t - `[:gen_rmq, :consumer, :message, :error]` - Dispatched by a GenRMQ consumer when a message fails to be processed - - Measurement: `%{time: System.monotonic_time}` - - Metadata: `%{module: atom, reason: tuple}` + - Measurement: `%{time: System.monotonic_time, duration: native_time}` + - Metadata: `%{module: atom, reason: tuple, message: GenRMQ.Message.t}` - `[:gen_rmq, :consumer, :connection, :start]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started diff --git a/lib/consumer.ex b/lib/consumer.ex index 6ccbf20..320cf64 100644 --- a/lib/consumer.ex +++ b/lib/consumer.ex @@ -191,7 +191,7 @@ defmodule GenRMQ.Consumer do Invoked when an error or timeout is encountered while executing `handle_message` callback `message` - `GenRMQ.Message` struct - `reason` - atom denoting the type of error + `reason` - the information regarding the error ## Examples: To reject the message that caused the Task to fail you can do something like so: @@ -202,9 +202,24 @@ defmodule GenRMQ.Consumer do GenRMQ.Consumer.reject(message) end + ``` + + The `reason` argument will either be the atom `:killed` if the Task timed out and needed + to be stopped. Or it will be a 2 elementr tuple where the first element is the error stuct + and the second element is the stacktrace: - The `reason` argument will either be `:timeout` or `:error` if the message processing task - timed out or encountered and error respectively. + ``` + { + %RuntimeError{message: "Can't divide by zero!"}, + [ + {TestConsumer.ErrorWithoutConcurrency, :handle_message, 1, [file: 'test/support/test_consumers.ex', line: 98]}, + {GenRMQ.Consumer, :handle_message, 2, [file: 'lib/consumer.ex', line: 519]}, + {GenRMQ.Consumer, :handle_info, 2, [file: 'lib/consumer.ex', line: 424]}, + {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, + {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, + {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]} + ] + } ``` """ @callback handle_error(message :: %GenRMQ.Message{}, reason :: atom()) :: :ok @@ -522,8 +537,9 @@ defmodule GenRMQ.Consumer do result rescue reason -> - emit_message_error_event(module, reason, message, start_time) - apply(module, :handle_error, [message, reason]) + full_error = {reason, __STACKTRACE__} + emit_message_error_event(module, full_error, message, start_time) + apply(module, :handle_error, [message, full_error]) :error end end diff --git a/test/gen_rmq_consumer_test.exs b/test/gen_rmq_consumer_test.exs index 3456a0b..3321f5b 100644 --- a/test/gen_rmq_consumer_test.exs +++ b/test/gen_rmq_consumer_test.exs @@ -122,12 +122,12 @@ defmodule GenRMQ.ConsumerTest do assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, %{ - reason: {%RuntimeError{message: "Can't divide by zero!"}, _}, + reason: {%RuntimeError{message: "Can't divide by zero!"}, _stacktrace}, module: ErrorInConsumer, message: _ }} - assert_receive {:task_error, {%RuntimeError{message: "Can't divide by zero!"}, _}} + assert_receive {:task_error, {%RuntimeError{message: "Can't divide by zero!"}, _stacktrace}} # Check that the message made it to the deadletter queue dl_queue = state.config[:queue][:dead_letter][:name] @@ -271,12 +271,12 @@ defmodule GenRMQ.ConsumerTest do assert_receive {:telemetry_event, [:gen_rmq, :consumer, :message, :error], %{time: _, duration: _}, %{ - reason: %RuntimeError{message: "Can't divide by zero!"}, + reason: {%RuntimeError{message: "Can't divide by zero!"}, _stacktrace}, module: ErrorWithoutConcurrency, message: _ }} - assert_receive {:synchronous_error, %RuntimeError{message: "Can't divide by zero!"}} + assert_receive {:synchronous_error, {%RuntimeError{message: "Can't divide by zero!"}, _stacktrace}} # Check that the message made it to the deadletter queue dl_queue = state.config[:queue][:dead_letter][:name]