Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Adding telemetry support to gen_rmq (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
akoutmos authored Jan 30, 2020
1 parent 823a71c commit 6ce6f20
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 31 deletions.
125 changes: 100 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ GenRMQ is a set of [behaviours][behaviours] meant to be used to create RabbitMQ
Internally it is using [AMQP][amqp] elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher
code, which usually includes:

* creating connection / channel and keeping it in a state
* creating and binding queue
* handling reconnections / consumer cancellations
- creating connection / channel and keeping it in a state
- creating and binding queue
- handling reconnections / consumer cancellations

The project currently provides the following functionality:

* `GenRMQ.Consumer` - a behaviour for implementing RabbitMQ consumers
* `GenRMQ.Publisher` - a behaviour for implementing RabbitMQ publishers
* `GenRMQ.Processor` - a behaviour for implementing RabbitMQ message processors
* `GenRMQ.RabbitCase` - test utilities for RabbitMQ ([example usage][rabbit_case_example])
- `GenRMQ.Consumer` - a behaviour for implementing RabbitMQ consumers
- `GenRMQ.Publisher` - a behaviour for implementing RabbitMQ publishers
- `GenRMQ.Processor` - a behaviour for implementing RabbitMQ message processors
- `GenRMQ.RabbitCase` - test utilities for RabbitMQ ([example usage][rabbit_case_example])

## Installation

~~~elixir
```elixir
def deps do
[{:gen_rmq, "~> 2.3.0"}]
end
~~~
```

## Migrations

Expand All @@ -39,7 +39,7 @@ More thorough examples for using `GenRMQ.Consumer` and `GenRMQ.Publisher` can be

### Consumer

~~~elixir
```elixir
defmodule Consumer do
@behaviour GenRMQ.Consumer

Expand All @@ -62,26 +62,26 @@ defmodule Consumer do
...
end
end
~~~
```

~~~elixir
```elixir
GenRMQ.Consumer.start_link(Consumer, name: Consumer)
~~~
```

This will result in:

* durable `gen_rmq_exchange.deadletter` exchange created or redeclared
* durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter`
* durable topic `gen_rmq_exchange` exchange created or redeclared
* durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter`
* every `handle_message` callback will executed in separate process. This can be disabled by setting `concurrency: false` in `init` callback
* on failed rabbitmq connection it will wait for a bit and then reconnect
- durable `gen_rmq_exchange.deadletter` exchange created or redeclared
- durable `gen_rmq_in_queue_error` queue created or redeclared. It will be bound to `gen_rmq_exchange.deadletter`
- durable topic `gen_rmq_exchange` exchange created or redeclared
- durable `gen_rmq_in_queue` queue created or redeclared. It will be bound to `gen_rmq_exchange` exchange and has a deadletter exchange set to `gen_rmq_exchange.deadletter`
- every `handle_message` callback will executed in separate process. This can be disabled by setting `concurrency: false` in `init` callback
- on failed rabbitmq connection it will wait for a bit and then reconnect

There are many options to control the consumer setup details, please check the `c:GenRMQ.Consumer.init/0` [docs][consumer_doc] for all available settings.

### Publisher

~~~elixir
```elixir
defmodule Publisher do
@behaviour GenRMQ.Publisher

Expand All @@ -92,20 +92,95 @@ defmodule Publisher do
]
end
end
~~~
```

~~~elixir
```elixir
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
~~~
```

## Telemetry

GenRMQ emits [Telemetry][https://github.com/beam-telemetry/telemetry] events for both consumers and publishers.
It currently exposes the following events:

- `[:gen_rmq, :publisher, :connection, :start]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ is started

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{exchange: String.t}`

* `[:gen_rmq, :publisher, :connection, :stop]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been established

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{exchange: String.t}`

* `[:gen_rmq, :publisher, :connection, :down]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been lost

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: atom}`

* `[:gen_rmq, :publisher, :message, :start]` - Dispatched by a GenRMQ publisher when a message is about to be published to RabbitMQ

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{exchange: String.t, message: String.t}`

* `[:gen_rmq, :publisher, :message, :stop]` - Dispatched by a GenRMQ publisher when a message has been published to RabbitMQ

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{exchange: String.t, message: String.t}`

* `[:gen_rmq, :publisher, :message, :error]` - Dispatched by a GenRMQ publisher when a message failed to be published to RabbitMQ

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{exchange: String.t, message: String.t, kind: atom, reason: atom}`

* `[:gen_rmq, :consumer, :message, :ack]` - Dispatched by a GenRMQ consumer when a message has been acknowledged

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t}`

* `[:gen_rmq, :consumer, :message, :reject]` - Dispatched by a GenRMQ consumer when a message has been rejected

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t, requeue: boolean}`

* `[:gen_rmq, :consumer, :message, :start]` - Dispatched by a GenRMQ consumer when the processing of a message has begun

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t, module: atom}`

* `[:gen_rmq, :consumer, :message, :stop]` - Dispatched by a GenRMQ consumer when the processing of a message has completed

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{message: String.t, module: atom}`

* `[:gen_rmq, :consumer, :connection, :start]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`

* `[:gen_rmq, :consumer, :connection, :stop]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been established

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`

* `[:gen_rmq, :consumer, :connection, :error]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ could not be made

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t, error: any}`

* `[:gen_rmq, :consumer, :connection, :down]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been lost

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: atom}`

## Running tests

You need [docker-compose][docker_compose] installed.

~~~bash
```bash
$ make test
~~~
```

## How to contribute

Expand Down
120 changes: 116 additions & 4 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ defmodule GenRMQ.Consumer do
`message` - `GenRMQ.Message` struct
"""
@spec ack(message :: %GenRMQ.Message{}) :: :ok
def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}}) do
def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message) do
emit_message_ack_event(message)

Basic.ack(channel, tag)
end

Expand All @@ -205,7 +207,9 @@ defmodule GenRMQ.Consumer do
`requeue` - indicates if message should be requeued
"""
@spec reject(message :: %GenRMQ.Message{}, requeue :: boolean) :: :ok
def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}}, requeue \\ false) do
def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message, requeue \\ false) do
emit_message_reject_event(message, requeue)

Basic.reject(channel, tag, requeue: requeue)
end

Expand Down Expand Up @@ -254,6 +258,8 @@ defmodule GenRMQ.Consumer do
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")

emit_connection_down_event(module, reason)

config
|> Keyword.get(:reconnect, true)
|> handle_reconnect(state)
Expand Down Expand Up @@ -333,14 +339,26 @@ defmodule GenRMQ.Consumer do
end

defp handle_message(payload, attributes, %{module: module} = state, false) do
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)
apply(module, :handle_message, [message])

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)

result
end

defp handle_message(payload, attributes, %{module: module} = state, true) do
spawn(fn ->
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)
apply(module, :handle_message, [message])

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)

result
end)
end

Expand All @@ -361,8 +379,16 @@ defmodule GenRMQ.Consumer do
end

defp get_connection(%{config: config, module: module, reconnect_attempt: attempt} = state) do
start_time = System.monotonic_time()
queue = config[:queue]
exchange = config[:exchange]
routing_key = config[:routing_key]

emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key)

case Connection.open(config[:uri]) do
{:ok, conn} ->
emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
Process.monitor(conn.pid)
Map.put(state, :conn, conn)

Expand All @@ -372,6 +398,8 @@ defmodule GenRMQ.Consumer do
"#{inspect(strip_key(config, :uri))}, reason #{inspect(e)}"
)

emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)

retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)
next_attempt = attempt + 1
retry_delay_fn.(next_attempt)
Expand Down Expand Up @@ -441,6 +469,90 @@ defmodule GenRMQ.Consumer do
end
end

defp emit_message_ack_event(message) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{message: message}

:telemetry.execute([:gen_rmq, :consumer, :message, :ack], measurements, metadata)
end

defp emit_message_reject_event(message, requeue) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{message: message, requeue: requeue}

:telemetry.execute([:gen_rmq, :consumer, :message, :reject], measurements, metadata)
end

defp emit_message_start_event(start_time, message, module) do
measurements = %{time: start_time}
metadata = %{message: message, module: module}

:telemetry.execute([:gen_rmq, :consumer, :message, :start], measurements, metadata)
end

defp emit_message_stop_event(start_time, message, module) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}
metadata = %{message: message, module: module}

:telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata)
end

defp emit_connection_down_event(module, reason) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{module: module, reason: reason}

:telemetry.execute([:gen_rmq, :consumer, :connection, :down], measurements, metadata)
end

defp emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key) do
measurements = %{time: start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :start], measurements, metadata)
end

defp emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :stop], measurements, metadata)
end

defp emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, error) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key,
error: error
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :error], measurements, metadata)
end

defp strip_key(keyword_list, key) do
keyword_list
|> Keyword.delete(key)
Expand Down
Loading

0 comments on commit 6ce6f20

Please sign in to comment.