Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent API #25

Merged
merged 8 commits into from
Mar 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/task_bunny/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule TaskBunny.Config do
@moduledoc """
Modules that help you access to TaskBunny config values
"""
alias TaskBunny.ConfigError

@default_concurrency 2

Expand All @@ -14,6 +15,14 @@ defmodule TaskBunny.Config do
|> Enum.map(fn ({host, _options}) -> host end)
end

@doc """
Returns host configuration. Returns nil when host is not configured.
"""
@spec host_config(atom) :: keyword | nil
def host_config(host) do
hosts_config()[host]
end

@doc """
Returns connect options for the host. It raises an error if the host is not found.
"""
Expand Down Expand Up @@ -42,6 +51,9 @@ defmodule TaskBunny.Config do

queue_config[:queues]
|> Enum.map(fn (queue) ->
unless queue[:name] do
raise ConfigError, message: "name is missing in queue definition. #{inspect queue}"
end
Keyword.merge(queue, [name: namespace <> queue[:name]])
end)
end
Expand Down Expand Up @@ -72,7 +84,7 @@ defmodule TaskBunny.Config do
@doc """
Returns queue for the given job
"""
@spec queue_for_job(atom) :: keyword
@spec queue_for_job(atom) :: keyword | nil
def queue_for_job(job) do
Enum.find(queues(), fn (queue) ->
match_job?(job, queue[:jobs])
Expand Down
67 changes: 47 additions & 20 deletions lib/task_bunny/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule TaskBunny.Connection do

use GenServer
require Logger
alias TaskBunny.Config
alias TaskBunny.{Config, Connection.ConnectError}

@reconnect_interval 5_000

Expand All @@ -32,24 +32,36 @@ defmodule TaskBunny.Connection do

@doc """
Gets a RabbitMQ connection for the given host.
Returns nil when the connection is not available.

Returns {:ok, conn} when connection is available.
Returns {:error, error_info} when connection is not ready.
"""
@spec get_connection(atom) :: struct | nil
@spec get_connection(atom) :: {:ok, AMQP.Connection.t} | {:error, atom}
def get_connection(host \\ :default) do
case Process.whereis(pname(host)) do
nil -> nil
pid -> GenServer.call(pid, :get_connection)
nil ->
case Config.host_config(host) do
nil -> {:error, :invalid_host}
_ -> {:error, :no_connection_process}
end
pid ->
case GenServer.call(pid, :get_connection) do
nil -> {:error, :not_connected}
conn -> {:ok, conn}
end
end
end

@doc """
Similar to `get_connection/1` but raises an error when connection is not ready.
Similar to get_connection/1 but raises an exception when connection is not ready.

Returns connection if it's available.
"""
@spec get_connection!(atom) :: struct
@spec get_connection!(atom) :: AMQP.Connection.t
def get_connection!(host \\ :default) do
case get_connection(host) do
nil -> raise "Failed to connect #{host}"
conn -> conn
{:ok, conn} -> conn
{:error, error_type} -> raise ConnectError, type: error_type, host: host
end
end

Expand All @@ -58,18 +70,33 @@ defmodule TaskBunny.Connection do
Once connection has been established, it will send a message with {:connected, connection} to the given process.

Returns :ok when the server exists.
Returns :error when the server doesn't exist.
Returns {:error, info} when the server doesn't exist.
"""
@spec monitor_connection(atom, pid) :: :ok | :error
def monitor_connection(host \\ :default, listener_pid) do
@spec subscribe_connection(atom, pid) :: :ok | {:error, atom}
def subscribe_connection(host \\ :default, listener_pid) do
case Process.whereis(pname(host)) do
nil -> :error
nil ->
case Config.host_config(host) do
nil -> {:error, :invalid_host}
_ -> {:error, :no_connection_process}
end
pid ->
GenServer.cast(pid, {:monitor_connection, listener_pid})
GenServer.cast(pid, {:subscribe_connection, listener_pid})
:ok
end
end

@doc """
Similar to subscribe_connection/2 but raises an exception when process is not ready.
"""
@spec subscribe_connection!(atom, pid) :: :ok
def subscribe_connection!(host \\ :default, listener_pid) do
case subscribe_connection(host, listener_pid) do
:ok -> :ok
{:error, error_type} -> raise ConnectError, type: error_type, host: host
end
end

@doc """
Initialises GenServer. Send a request to establish a connection.
"""
Expand All @@ -85,9 +112,9 @@ defmodule TaskBunny.Connection do
end

@spec handle_cast(tuple, state) :: {:noreply, state}
def handle_cast({:monitor_connection, listener}, {host, connection, listeners}) do
def handle_cast({:subscribe_connection, listener}, {host, connection, listeners}) do
if connection do
notify_connect(connection, [listener])
publish_connection(connection, [listener])
{:noreply, {host, connection, listeners}}
else
{:noreply, {host, connection, [listener | listeners]}}
Expand All @@ -104,7 +131,7 @@ defmodule TaskBunny.Connection do
{:ok, connection} ->
Logger.info "TaskBunny.Connection: connected to #{host}"
Process.monitor(connection.pid)
notify_connect(connection, listeners)
publish_connection(connection, listeners)

{:noreply, {host, connection, []}}
error ->
Expand All @@ -121,9 +148,9 @@ defmodule TaskBunny.Connection do
{:stop, {:connection_lost, reason}, {host, nil, []}}
end

@spec notify_connect(struct, list(pid)) :: :ok
defp notify_connect(connection, listeners) do
Logger.debug "TaskBunny.Connection: notifying to #{inspect listeners}"
@spec publish_connection(struct, list(pid)) :: :ok
defp publish_connection(connection, listeners) do
Logger.debug "TaskBunny.Connection: publishing to #{inspect listeners}"
Enum.each listeners, fn (pid) ->
if Process.alive?(pid), do: send(pid, {:connected, connection})
end
Expand Down
23 changes: 13 additions & 10 deletions lib/task_bunny/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ defmodule TaskBunny.Consumer do
@doc """
Opens a channel for the given connection and start consuming messages for the queue.
"""
@spec consume(struct, String.t, integer) :: {struct, String.t} | nil
@spec consume(AMQP.Connection.t, String.t, integer) :: {:ok, AMQP.Channel.t, String.t} | {:error, any}
def consume(connection, queue, concurrency) do
case AMQP.Channel.open(connection) do
{:ok, channel} ->
:ok = AMQP.Basic.qos(channel, prefetch_count: concurrency)
{:ok, consumer_tag} = AMQP.Basic.consume(channel, queue)

{channel, consumer_tag}
with {:ok, channel} <- AMQP.Channel.open(connection),
:ok <- AMQP.Basic.qos(channel, prefetch_count: concurrency),
{:ok, consumer_tag} <- AMQP.Basic.consume(channel, queue)
do
{:ok, channel, consumer_tag}
else
error ->
Logger.warn "TaskBunny.Consumer: failed to open channel for #{queue}. Detail: #{inspect error}"
Logger.warn """
TaskBunny.Consumer: start consumer for #{queue}.
Detail: #{inspect error}"
"""

nil
{:error, error}
end
end

Expand All @@ -33,7 +36,7 @@ defmodule TaskBunny.Consumer do
@doc """
Acknowledges to the message.
"""
@spec ack(%AMQP.Channel{}, map, boolean) :: :ok
@spec ack(AMQP.Channel.t, map, boolean) :: :ok
def ack(channel, meta, succeeded)

def ack(channel, %{delivery_tag: tag}, true), do: AMQP.Basic.ack(channel, tag)
Expand Down
111 changes: 111 additions & 0 deletions lib/task_bunny/errors.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule TaskBunny.ConfigError do
@moduledoc """
Raised when an error was found on TaskBunny config
"""
defexception [:message]

@lint false
def exception(message: message) do
title = "Failed to load TaskBunny config"
message = "#{title}\n#{message}"
%__MODULE__{message: message}
end
_ = @lint
end

defmodule TaskBunny.Connection.ConnectError do
@moduledoc """
Raised when failed to retain a connection
"""
defexception [:type, :message]

@lint false
def exception(_opts = [type: type, host: host]) do
title = "Failed to get a connection to host '#{host}'."
detail = case type do
:invalid_host ->
"The host is not defined in config"
:no_connection_process ->
"""
No process running for the host connection.

- Make sure supervisor process is up running.
- You might try to get connection before the process is ready.
"""
:not_connected ->
"""
The connection is not available.

- Check if RabbitMQ host is up running.
- Make sure you can connect to RabbitMQ from the application host.
- You might try to get connection before process is ready.
"""
fallback ->
"#{fallback}"
end

message = "#{title}\n#{detail}"
%__MODULE__{message: message, type: type}
end
_ = @lint
end

defmodule TaskBunny.Job.QueueNotFoundError do
@moduledoc """
Raised when failed to find a queue for the job.
"""
defexception [:job, :message]

@lint false
def exception(job: job) do
title = "Failed to find a queue for the job."
detail = "job=#{job}"

message = "#{title}\n#{detail}"
%__MODULE__{message: message, job: job}
end
_ = @lint
end

defmodule TaskBunny.Message.DecodeError do
@moduledoc """
Raised when failed to decode the message.
"""
defexception [:message]

@lint false
def exception(opts) do
title = "Failed to decode the message."
detail = case opts[:type] do
:job_not_loaded ->
"Job is not valid Elixir module"
:poison_decode_error ->
"Failed to decode the message in JSON. error=#{inspect opts[:error]}"
:decode_error ->
"Failed to decode the message. error=#{inspect opts[:error]}"
fallback ->
"#{fallback}"
end

message = "#{title}\n#{detail}\nmessage body=#{opts[:body]}"
%__MODULE__{message: message}
end
_ = @lint
end

defmodule TaskBunny.Publisher.PublishError do
@moduledoc """
Raised when failed to publish the message.
"""
defexception [:message, :inner_error]

@lint false
def exception(inner_error: inner_error) do
title = "Failed to publish the message."
detail = "error=#{inspect inner_error}"

message = "#{title}\n#{detail}"
%__MODULE__{message: message, inner_error: inner_error}
end
_ = @lint
end
Loading