Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Task Supervisor for message consumers #179

Merged
merged 19 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ elixir:
- 1.7
- 1.8
- 1.9
- 1.10

otp_release:
- 20.0
- 21.0
mkorszun marked this conversation as resolved.
Show resolved Hide resolved

sudo: required
Expand Down
18 changes: 7 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ This will result in:
- durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter`
- durable topic `gen_rmq_exchange` exchange created or redeclared
- durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter`
- every `handle_message` callback will executed in separate process. This can be disabled by setting `concurrency: false` in `init` callback
- every `handle_message` callback will be executed in a separate supervised Task. This can be disabled by setting `concurrency: false` in `init` callback
- on failed rabbitmq connection it will wait for a bit and then reconnect

There are many options to control the consumer setup details, please check the `c:GenRMQ.Consumer.init/0` [docs][consumer_doc] for all available settings.
Expand Down Expand Up @@ -138,22 +138,22 @@ Are you using GenRMQ in Production? Please let us know, we are curious to learn

## Maintainers

* Mateusz ([@mkorszun](https://github.com/mkorszun))
- Mateusz ([@mkorszun](https://github.com/mkorszun))

The maintainers are responsible for the general project oversight, and empowering further trusted committers (see below).

The maintainers are the ones that create new releases of GenRMQ.

## Trusted Committers

* Joel ([@vorce](https://github.com/vorce))
* Sebastian ([@spier](https://github.com/spier))
* [@Shemeikka](https://github.com/Shemeikka)
* Alexander ([@akoutmos](https://github.com/akoutmos))
- Joel ([@vorce](https://github.com/vorce))
- Sebastian ([@spier](https://github.com/spier))
- [@Shemeikka](https://github.com/Shemeikka)
- Alexander ([@akoutmos](https://github.com/akoutmos))

Trusted Committers are members of our community who we have explicitly added to our GitHub repository. Trusted Committers have elevated rights, allowing them to send in changes directly to branches and to approve Pull Requests. For details see [TRUSTED-COMMITTERS.md][trusted_commiters].

*Note:* Maintainers and Trusted Committers are listed in [.github/CODEOWNERS][code_owners] in order to automatically assign PR reviews to them.
_Note:_ Maintainers and Trusted Committers are listed in [.github/CODEOWNERS][code_owners] in order to automatically assign PR reviews to them.

## License

Expand All @@ -170,23 +170,19 @@ Copyright (c) 2018 - 2020 Meltwater Inc. [underthehood.meltwater.com][undertheho
[gen_rmq_issues]: https://github.com/meltwater/gen_rmq/issues
[priority_queues]: https://www.rabbitmq.com/priority.html
[underthehood]: http://underthehood.meltwater.com/

[examples]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples
[example_consumer]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples/consumer.ex
[example_publisher]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples/publisher.ex
[example_processor]: https://github.com/meltwater/gen_rmq/blob/master/documentation/examples/processor.ex
[example_rabbit_case]: https://github.com/meltwater/gen_rmq/blob/master/test/gen_rmq_publisher_test.exs

[guide_consumer_basic_setup]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/basic_setup.md
[guide_consumer_with_custom_deadletter_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_deadletter_configuration.md
[guide_consumer_with_custom_exchange_type]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_exchange_type.md
[guide_consumer_with_custom_queue_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_queue_configuration.md
[without_deadletter_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/without_deadletter_configuration.md
[with_quorum_queue_type]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_quorum_queue_type.md

[consumer_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/telemetry_events.md
[publisher_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/publisher/telemetry_events.md

[trusted_commiters]: https://github.com/meltwater/gen_rmq/blob/master/TRUSTED-COMMITTERS.md
[code_owners]: https://github.com/meltwater/gen_rmq/blob/master/.github/CODEOWNERS
[license]: https://github.com/meltwater/gen_rmq/blob/master/LICENSE
5 changes: 5 additions & 0 deletions documentation/guides/consumer/telemetry_events.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes t
- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: atom}`

- `[:gen_rmq, :consumer, :task, :error]` - Dispatched by a GenRMQ consumer when a supervised Task fails to process a message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: tuple}`

[telemetry]: https://github.com/beam-telemetry/telemetry
149 changes: 111 additions & 38 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ defmodule GenRMQ.Consumer do
`queue_max_priority` - defines if a declared queue should be a priority queue.
Should be set to a value from `1..255` range. If it is greater than `255`, queue
max priority will be set to `255`. Values between `1` and `10` are
[recommened](https://www.rabbitmq.com/priority.html#resource-usage).
[recommended](https://www.rabbitmq.com/priority.html#resource-usage).
spier marked this conversation as resolved.
Show resolved Hide resolved

`concurrency` - defines if `handle_message` callback is called
in seperate process using [spawn](https://hexdocs.pm/elixir/Process.html#spawn/2)
in separate process using [spawn](https://hexdocs.pm/elixir/Process.html#spawn/2)
function. By default concurrency is enabled. To disable, set it to `false`

`terminate_timeout` - defines how long the consumer will wait for in-flight Tasks to
complete before terminating the process. The value is in milliseconds and the default
is 5_000 milliseconds.

`retry_delay_function` - custom retry delay function. Called when the connection to
the broker cannot be established. Receives the connection attempt as an argument (>= 1)
and is expected to wait for some time.
Expand Down Expand Up @@ -107,7 +111,8 @@ defmodule GenRMQ.Consumer do
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
concurrency: true,
queue_ttl: 5000,
terminate_timeout: 5_000,
queue_ttl: 5_000,
retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
reconnect: true,
deadletter: true,
Expand All @@ -134,6 +139,7 @@ defmodule GenRMQ.Consumer do
prefetch_count: String.t(),
uri: String.t(),
concurrency: boolean,
terminate_timeout: integer,
queue_ttl: integer,
retry_delay_function: function,
reconnect: boolean,
Expand Down Expand Up @@ -257,15 +263,29 @@ defmodule GenRMQ.Consumer do
Process.flag(:trap_exit, true)
config = apply(module, :init, [])
parsed_config = parse_config(config)
terminate_timeout = Keyword.get(parsed_config, :terminate_timeout, 5_000)

state =
initial_state
|> Map.put(:config, parsed_config)
|> Map.put(:reconnect_attempt, 0)
|> Map.put(:running_tasks, %{})
|> Map.put(:terminate_timeout, terminate_timeout)

{:ok, state, {:continue, :init}}
end

send(self(), :init)
@doc false
@impl GenServer
def handle_continue(:init, state) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely!

state =
state
|> get_connection()
|> open_channels()
|> setup_consumer()
|> setup_task_supervisor()

{:ok, state}
{:noreply, state}
end

@doc false
Expand All @@ -276,26 +296,42 @@ defmodule GenRMQ.Consumer do

@doc false
@impl GenServer
def handle_info(:init, state) do
state =
state
|> get_connection()
|> open_channels()
|> setup_consumer()
def handle_info(
{:DOWN, ref, :process, _pid, reason},
%{module: module, config: config, running_tasks: running_tasks} = state
) do
if Map.has_key?(running_tasks, ref) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in this case to the message? Will it be rejected/nacked?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call....we would probably want some sort of configuration so the user can decide what happens to the message if the task fails. Perhaps we also provide the option to send the message to the deadletter exchange. Also on that same note, we may also want some configuration around how long the Tasks take to complete. Currently it is set to the async_nolink value of 5 seconds. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should be able to set a callback to deal with the message? But I think the sensible default here would be to nack the message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the callback, smth like: handle_error(reason, state). Then in this callback user has all the power to ack, reject or requeue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, are we capturing here also task timeouts?

Copy link
Collaborator Author

@akoutmos akoutmos May 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My latest commit adds the ability to configure a timeout for tasks. It was a little tricky to add that in given that Task.Supervisor does not provide that functionality for async_nolink, but I think I have a good implementation in place to handle this functionality. I misspoke earlier in regards to the 5 second timeout...that is for async_stream and async_stream_nolink.

As for a custom callback to handle the failure, are you thinking about rewriting the consumer module to a macro so that we can leverage defoverridable if the user does not want to use the default error handler? Or would handle_error be a required behaviour callback that needs to be implemented by the user?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akoutmos I would prefer to avoid rewriting consumer module to a macro at this stage (just to limit the scope of changes).

Since these changes will be released as a major version, requiring error callback to be implemented should be fine, right? @vorce what do you think?

We could also consider skipping error callback and just reject the messages. The problem here is that users might skip dead-letter configuration for their consumers.

Copy link
Collaborator

@vorce vorce May 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkorszun @akoutmos Hm hm. Yeah I think making each consumer implement a callback is OK. At least then we encourage users to think about this case. We can have a good implementation in the example consumer that logs and nacks the message.

Copy link
Collaborator Author

@akoutmos akoutmos May 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll add an additional callback handle_error that will be triggered on task exception or task timeout along with an example or 2. I agree that this is probably the way to go so that gen_rmq does not impose any assumptions upon the user. They can deal with the error the way that best fits their needs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, and thanks again for all the work @akoutmos

Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}")

{:noreply, state}
emit_task_error_event(module, reason)

updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)}

{:noreply, updated_state}
else
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")

emit_connection_down_event(module, reason)

config
|> Keyword.get(:reconnect, true)
|> handle_reconnect(state)
end
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")
def handle_info({ref, _task_result}, %{running_tasks: running_tasks} = state) when is_reference(ref) do
Process.demonitor(ref, [:flush])

emit_connection_down_event(module, reason)
updated_state =
if Map.has_key?(running_tasks, ref) do
%{state | running_tasks: Map.delete(running_tasks, ref)}
else
state
end

config
|> Keyword.get(:reconnect, true)
|> handle_reconnect(state)
{:noreply, updated_state}
end

@doc false
Expand All @@ -321,29 +357,37 @@ defmodule GenRMQ.Consumer do

@doc false
@impl GenServer
def handle_info({:basic_deliver, payload, attributes}, %{module: module, config: config} = state) do
def handle_info({:basic_deliver, payload, attributes}, %{module: module, running_tasks: running_tasks} = state) do
%{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes
Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}")

if redelivered do
Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}")
end

handle_message(payload, attributes, state, Keyword.get(config, :concurrency, true))
updated_state =
case handle_message(payload, attributes, state) do
%Task{ref: ref} = task -> %{state | running_tasks: Map.put(running_tasks, ref, task)}
_ -> state
end

{:noreply, state}
{:noreply, updated_state}
end

@doc false
@impl GenServer
def terminate(:connection_closed = reason, %{module: module}) do
def terminate(:connection_closed = reason, %{module: module} = state) do
await_running_tasks(state)

# Since connection has been closed no need to clean it up
Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}")
end

@doc false
@impl GenServer
def terminate(reason, %{module: module, conn: conn, in: in_chan, out: out_chan}) do
def terminate(reason, %{module: module, conn: conn, in: in_chan, out: out_chan} = state) do
await_running_tasks(state)

Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}")
Channel.close(in_chan)
Channel.close(out_chan)
Expand All @@ -352,20 +396,30 @@ defmodule GenRMQ.Consumer do

@doc false
@impl GenServer
def terminate({{:shutdown, {:server_initiated_close, error_code, reason}}, _}, %{module: module}) do
def terminate({{:shutdown, {:server_initiated_close, error_code, reason}}, _}, %{module: module} = state) do
await_running_tasks(state)

Logger.error("[#{module}]: Terminating consumer, error_code: #{inspect(error_code)}, reason: #{inspect(reason)}")
end

@doc false
@impl GenServer
def terminate(reason, %{module: module}) do
def terminate(reason, %{module: module} = state) do
await_running_tasks(state)

Logger.error("[#{module}]: Terminating consumer, unexpected reason: #{inspect(reason)}")
end

##############################################################################
# Helpers
##############################################################################

defp await_running_tasks(%{running_tasks: running_tasks, terminate_timeout: terminate_timeout}) do
running_tasks
|> Map.values()
|> Task.yield_many(terminate_timeout)
end

defp parse_config(config) do
queue_name = Keyword.fetch!(config, :queue)

Expand All @@ -374,19 +428,9 @@ defmodule GenRMQ.Consumer do
|> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri]))
end

defp handle_message(payload, attributes, %{module: module} = state, false) do
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)

result
end

defp handle_message(payload, attributes, %{module: module} = state, true) do
spawn(fn ->
defp handle_message(payload, attributes, %{module: module, task_supervisor: task_supervisor_pid} = state)
when is_pid(task_supervisor_pid) do
Task.Supervisor.async_nolink(task_supervisor_pid, fn ->
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

Expand All @@ -398,6 +442,17 @@ defmodule GenRMQ.Consumer do
end)
end

defp handle_message(payload, attributes, %{module: module} = state) do
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)

result
end

defp handle_reconnect(false, %{module: module} = state) do
Logger.info("[#{module}]: Reconnection is disabled. Terminating consumer.")
{:stop, :connection_closed, state}
Expand Down Expand Up @@ -452,6 +507,16 @@ defmodule GenRMQ.Consumer do
Map.merge(state, %{in: chan, out: out_chan})
end

defp setup_task_supervisor(%{config: config} = state) do
if Keyword.get(config, :concurrency, true) do
{:ok, pid} = Task.Supervisor.start_link()

Map.put(state, :task_supervisor, pid)
else
Map.put(state, :task_supervisor, nil)
end
end

defp setup_consumer(%{in: chan, config: config, module: module} = state) do
queue_config = config[:queue]
prefetch_count = String.to_integer(config[:prefetch_count])
Expand Down Expand Up @@ -507,6 +572,14 @@ defmodule GenRMQ.Consumer do
:telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata)
end

defp emit_task_error_event(module, reason) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{module: module, reason: reason}

:telemetry.execute([:gen_rmq, :consumer, :task, :error], measurements, metadata)
end

defp emit_connection_down_event(module, reason) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
Expand Down
21 changes: 11 additions & 10 deletions lib/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,17 @@ defmodule GenRMQ.Publisher do
Process.flag(:trap_exit, true)
config = apply(module, :init, [])
state = Map.merge(initial_state, %{config: config})
send(self(), :init)
{:ok, state}

{:ok, state, {:continue, :init}}
end

@doc false
@impl GenServer
def handle_continue(:init, %{module: module, config: config}) do
Logger.info("[#{module}]: Setting up publisher connection and configuration")
{:ok, state} = setup_publisher(%{module: module, config: config})

{:noreply, state}
end

@doc false
Expand Down Expand Up @@ -279,14 +288,6 @@ defmodule GenRMQ.Publisher do
{:reply, result, state}
end

@doc false
@impl GenServer
def handle_info(:init, %{module: module, config: config}) do
Logger.info("[#{module}]: Setting up publisher connection and configuration")
{:ok, state} = setup_publisher(%{module: module, config: config})
{:noreply, state}
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config}) do
Expand Down
Loading