From fc1a6421fcf16a6b3c0e5a7ab02703749a18c5ee Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 9 Mar 2017 14:14:02 +0000 Subject: [PATCH 1/8] get_connection to return tuple --- lib/task_bunny/connection.ex | 67 ++++++++++++++++++++--------- lib/task_bunny/errors.ex | 33 ++++++++++++++ test/task_bunny/connection_test.exs | 38 +++++++++++----- 3 files changed, 107 insertions(+), 31 deletions(-) create mode 100644 lib/task_bunny/errors.ex diff --git a/lib/task_bunny/connection.ex b/lib/task_bunny/connection.ex index 96d35ae..f48b3f0 100644 --- a/lib/task_bunny/connection.ex +++ b/lib/task_bunny/connection.ex @@ -8,7 +8,7 @@ defmodule TaskBunny.Connection do use GenServer require Logger - alias TaskBunny.Config + alias TaskBunny.{Config, Connection.ConnectError} @reconnect_interval 5_000 @@ -32,24 +32,36 @@ defmodule TaskBunny.Connection do @doc """ Gets a RabbitMQ connection for the given host. - Returns nil when the connection is not available. + + Returns {:ok, conn} when connection is available. + Returns {:error, error_info} when connection is not ready. """ - @spec get_connection(atom) :: struct | nil + @spec get_connection(atom) :: {:ok, AMQP.Connection.t} | {:error, atom} def get_connection(host \\ :default) do case Process.whereis(pname(host)) do - nil -> nil - pid -> GenServer.call(pid, :get_connection) + nil -> + case Config.host_config(host) do + nil -> {:error, :invalid_host} + _ -> {:error, :no_connection_process} + end + pid -> + case GenServer.call(pid, :get_connection) do + nil -> {:error, :not_connected} + conn -> {:ok, conn} + end end end @doc """ - Similar to `get_connection/1` but raises an error when connection is not ready. + Similar to get_connection/1 but raises an exception when connection is not ready. + + Returns connection if it's available. """ - @spec get_connection!(atom) :: struct + @spec get_connection!(atom) :: AMQP.Connection.t def get_connection!(host \\ :default) do case get_connection(host) do - nil -> raise "Failed to connect #{host}" - conn -> conn + {:ok, conn} -> conn + {:error, error_type} -> raise ConnectError, type: error_type, host: host end end @@ -58,18 +70,33 @@ defmodule TaskBunny.Connection do Once connection has been established, it will send a message with {:connected, connection} to the given process. Returns :ok when the server exists. - Returns :error when the server doesn't exist. + Returns {:error, info} when the server doesn't exist. """ - @spec monitor_connection(atom, pid) :: :ok | :error - def monitor_connection(host \\ :default, listener_pid) do + @spec subscribe_connection(atom, pid) :: :ok | {:error, atom} + def subscribe_connection(host \\ :default, listener_pid) do case Process.whereis(pname(host)) do - nil -> :error + nil -> + case Config.host_config(host) do + nil -> {:error, :invalid_host} + _ -> {:error, :no_connection_process} + end pid -> - GenServer.cast(pid, {:monitor_connection, listener_pid}) + GenServer.cast(pid, {:subscribe_connection, listener_pid}) :ok end end + @doc """ + Similar to subscribe_connection/2 but raises an exception when process is not ready. + """ + @spec subscribe_connection!(atom, pid) :: :ok + def subscribe_connection!(host \\ :default, listener_pid) do + case subscribe_connection(host, listener_pid) do + :ok -> :ok + {:error, error_type} -> raise ConnectError, type: error_type, host: host + end + end + @doc """ Initialises GenServer. Send a request to establish a connection. """ @@ -85,9 +112,9 @@ defmodule TaskBunny.Connection do end @spec handle_cast(tuple, state) :: {:noreply, state} - def handle_cast({:monitor_connection, listener}, {host, connection, listeners}) do + def handle_cast({:subscribe_connection, listener}, {host, connection, listeners}) do if connection do - notify_connect(connection, [listener]) + publish_connection(connection, [listener]) {:noreply, {host, connection, listeners}} else {:noreply, {host, connection, [listener | listeners]}} @@ -104,7 +131,7 @@ defmodule TaskBunny.Connection do {:ok, connection} -> Logger.info "TaskBunny.Connection: connected to #{host}" Process.monitor(connection.pid) - notify_connect(connection, listeners) + publish_connection(connection, listeners) {:noreply, {host, connection, []}} error -> @@ -121,9 +148,9 @@ defmodule TaskBunny.Connection do {:stop, {:connection_lost, reason}, {host, nil, []}} end - @spec notify_connect(struct, list(pid)) :: :ok - defp notify_connect(connection, listeners) do - Logger.debug "TaskBunny.Connection: notifying to #{inspect listeners}" + @spec publish_connection(struct, list(pid)) :: :ok + defp publish_connection(connection, listeners) do + Logger.debug "TaskBunny.Connection: publishing to #{inspect listeners}" Enum.each listeners, fn (pid) -> if Process.alive?(pid), do: send(pid, {:connected, connection}) end diff --git a/lib/task_bunny/errors.ex b/lib/task_bunny/errors.ex new file mode 100644 index 0000000..00725f6 --- /dev/null +++ b/lib/task_bunny/errors.ex @@ -0,0 +1,33 @@ +defmodule TaskBunny.Connection.ConnectError do + @moduledoc """ + Raised when failed to retain a connection + """ + defexception [:type, :message] + + @spec exception(keyword) :: map + def exception(_opts = [type: type, host: host]) do + title = "Failed to get a connection to host '#{host}':" + detail = case type do + :invalid_host -> + "The host is not defined in config" + :no_connection_process -> + """ + No process running for the host connection. + + - Make sure supervisor process is up running. + - You might try to get connection before the process is ready. + """ + :not_connected -> + """ + The connection is not available. + + - Check if RabbitMQ host is up running. + - Make sure you can connect to RabbitMQ from the application host. + - You might try to get connection before process is ready. + """ + end + + message = "#{title}\n#{detail}" + %__MODULE__{message: message, type: type} + end +end diff --git a/test/task_bunny/connection_test.exs b/test/task_bunny/connection_test.exs index 3e741cb..2ac18d0 100644 --- a/test/task_bunny/connection_test.exs +++ b/test/task_bunny/connection_test.exs @@ -11,26 +11,33 @@ defmodule TaskBunny.ConnectionTest do describe "get_connection" do test "returns AMQP connection" do - conn = Connection.get_connection(:default) + {:ok, conn} = Connection.get_connection(:default) assert %AMQP.Connection{} = conn end - test "returns nil when connection is not available" do - conn = Connection.get_connection(:invalid_host) - assert conn == nil + test "returns error when connection is not available" do + assert {:error, _} = Connection.get_connection(:foobar) end end - describe "monitor_connection" do + describe "get_connection!" do + test "raises ConnectError when connection is not available" do + assert_raise Connection.ConnectError, fn -> + Connection.get_connection!(:foobar) + end + end + end + + describe "subscribe_connection" do test "sends connection to caller process" do - ret = Connection.monitor_connection(:default, self()) + ret = Connection.subscribe_connection(:default, self()) assert ret == :ok assert_receive {:connected, %AMQP.Connection{}} end test "returns :error for invalid host" do - ret = Connection.monitor_connection(:invalid_host, self()) - assert ret == :error + ret = Connection.subscribe_connection(:foobar, self()) + assert ret == {:error, :invalid_host} end test "when the server has not established a connection" do @@ -38,7 +45,7 @@ defmodule TaskBunny.ConnectionTest do :meck.expect Config, :connect_options, fn (:foo) -> "amqp://localhost:1111" end {:ok, pid} = Connection.start_link(:foo) - ret = Connection.monitor_connection(:foo, self()) + ret = Connection.subscribe_connection(:foo, self()) # Trying to connect assert ret == :ok @@ -54,15 +61,24 @@ defmodule TaskBunny.ConnectionTest do end end + describe "subscribe_connection!" do + test "raises ConnectError for invalid host" do + assert_raise Connection.ConnectError, fn -> + Connection.subscribe_connection!(:foobar, self()) + end + end + end + describe "when connection is lost" do - test "exits the process" do # ...so that the supervisor can restart it + test "exits the process" do + # ...so that the supervisor can restart it :meck.new Config :meck.expect Config, :connect_options, fn (:foo) -> [] end {:ok, pid} = Connection.start_link(:foo) Process.unlink(pid) - conn = Connection.get_connection(:foo) + {:ok, conn} = Connection.get_connection(:foo) AMQP.Connection.close(conn) :timer.sleep(10) From 44bc0e723a05b33ec33a8319c1006bb5956a2285 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 9 Mar 2017 14:14:16 +0000 Subject: [PATCH 2/8] Fix implications for connection changes and mark inconsistent API return values. --- lib/task_bunny/config.ex | 8 ++++++++ lib/task_bunny/consumer.ex | 1 + lib/task_bunny/job.ex | 5 +++++ lib/task_bunny/message.ex | 5 +++++ lib/task_bunny/publisher.ex | 5 ++++- lib/task_bunny/queue.ex | 2 +- lib/task_bunny/status.ex | 6 +++--- lib/task_bunny/worker.ex | 2 +- test/support/job_test_helper.ex | 2 +- test/support/queue_test_helper.ex | 2 +- test/task_bunny/supervisor_test.exs | 2 +- test/task_bunny/worker_supervisor_test.exs | 4 ++-- test/task_bunny/worker_test.exs | 6 +++--- 13 files changed, 36 insertions(+), 14 deletions(-) diff --git a/lib/task_bunny/config.ex b/lib/task_bunny/config.ex index 85bb0d4..870187e 100644 --- a/lib/task_bunny/config.ex +++ b/lib/task_bunny/config.ex @@ -14,6 +14,14 @@ defmodule TaskBunny.Config do |> Enum.map(fn ({host, _options}) -> host end) end + @doc """ + Returns host configuration. Returns nil when host is not configured. + """ + @spec host_config(atom) :: keyword | nil + def host_config(host) do + hosts_config()[host] + end + @doc """ Returns connect options for the host. It raises an error if the host is not found. """ diff --git a/lib/task_bunny/consumer.ex b/lib/task_bunny/consumer.ex index 2f021b3..d5bb7d0 100644 --- a/lib/task_bunny/consumer.ex +++ b/lib/task_bunny/consumer.ex @@ -8,6 +8,7 @@ defmodule TaskBunny.Consumer do Opens a channel for the given connection and start consuming messages for the queue. """ @spec consume(struct, String.t, integer) :: {struct, String.t} | nil + # TODO: returns tuple def consume(connection, queue, concurrency) do case AMQP.Channel.open(connection) do {:ok, channel} -> diff --git a/lib/task_bunny/job.ex b/lib/task_bunny/job.ex index a164a7c..5ea4428 100644 --- a/lib/task_bunny/job.ex +++ b/lib/task_bunny/job.ex @@ -21,6 +21,9 @@ defmodule TaskBunny.Job do do_enqueue(host, queue, message) end + # TODO: enqueue! + # custom errors + @spec do_enqueue(atom, String.t|nil, String.t) :: :ok | {:error, any} defp do_enqueue(host, nil, message) do {:error, "Can't find a queue for #{__MODULE__}"} @@ -59,4 +62,6 @@ defmodule TaskBunny.Job do defoverridable [timeout: 0, max_retry: 0, retry_interval: 1] end end + + # TODO: documentation end diff --git a/lib/task_bunny/message.ex b/lib/task_bunny/message.ex index 1b2d0b8..289c436 100644 --- a/lib/task_bunny/message.ex +++ b/lib/task_bunny/message.ex @@ -7,6 +7,7 @@ defmodule TaskBunny.Message do Encode message body in JSON with job and arugment. """ @spec encode(atom, any) :: String.t + # TODO: return tuple def encode(job, payload) do %{ "job" => encode_job(job), @@ -16,6 +17,8 @@ defmodule TaskBunny.Message do |> Poison.encode!(pretty: true) end + # TODO: encode! + @doc """ Decode message body in JSON to map """ @@ -36,6 +39,8 @@ defmodule TaskBunny.Message do error -> {:error, {:decode_exception, error}} end + # TODO: decode! + @spec encode_job(atom) :: String.t defp encode_job(job) do job diff --git a/lib/task_bunny/publisher.ex b/lib/task_bunny/publisher.ex index aa13aa7..5794374 100644 --- a/lib/task_bunny/publisher.ex +++ b/lib/task_bunny/publisher.ex @@ -15,7 +15,7 @@ defmodule TaskBunny.Publisher do """ @spec publish(atom, String.t, String.t, keyword) :: :ok | {:error, any} def publish(host, queue, message, options \\ []) do - conn = TaskBunny.Connection.get_connection(host) + {:ok, conn} = TaskBunny.Connection.get_connection(host) exchange = "" routing_key = queue options = Keyword.merge([persistent: true], options) @@ -23,12 +23,15 @@ defmodule TaskBunny.Publisher do do_publish(conn, exchange, routing_key, message, options) end + # TODO: publish! + @spec do_publish(AMQP.Connection.t, String.t, String.t, String.t, keyword) :: :ok | {:error, any} defp do_publish(nil, _, _, _, _), do: {:error, "Failed to connect to AMQP host"} defp do_publish(conn, exchange, routing_key, message, options) do Logger.debug "TaskBunny.Publisher: publish:\r\n #{exchange} - #{routing_key}: #{inspect message}. options = #{inspect options}" + # TODO: returns detail error {:ok, channel} = AMQP.Channel.open(conn) :ok = AMQP.Basic.publish(channel, exchange, routing_key, message, options) :ok = AMQP.Channel.close(channel) diff --git a/lib/task_bunny/queue.ex b/lib/task_bunny/queue.ex index 6fa2a44..36c05a1 100644 --- a/lib/task_bunny/queue.ex +++ b/lib/task_bunny/queue.ex @@ -41,7 +41,7 @@ defmodule TaskBunny.Queue do @spec delete_with_subqueues(%AMQP.Connection{} | atom, String.t) :: :ok def delete_with_subqueues(host, work_queue) when is_atom(host) do - conn = TaskBunny.Connection.get_connection(host) + conn = TaskBunny.Connection.get_connection!(host) delete_with_subqueues(conn, work_queue) end diff --git a/lib/task_bunny/status.ex b/lib/task_bunny/status.ex index 3ce4933..32ceb9f 100644 --- a/lib/task_bunny/status.ex +++ b/lib/task_bunny/status.ex @@ -84,9 +84,9 @@ defmodule TaskBunny.Status do @spec get_connection_status() :: boolean defp get_connection_status do - case Connection.get_connection do - nil -> false - _ -> true + case Connection.get_connection() do + {:ok, _} -> true + _ -> false end end diff --git a/lib/task_bunny/worker.ex b/lib/task_bunny/worker.ex index 9ceeb96..2739097 100644 --- a/lib/task_bunny/worker.ex +++ b/lib/task_bunny/worker.ex @@ -64,7 +64,7 @@ defmodule TaskBunny.Worker do def init(state = %Worker{}) do Logger.info log_msg("initializing", state) - case Connection.monitor_connection(state.host, self()) do + case Connection.subscribe_connection(state.host, self()) do :ok -> Process.flag(:trap_exit, true) diff --git a/test/support/job_test_helper.ex b/test/support/job_test_helper.ex index d996f4f..e72bd17 100644 --- a/test/support/job_test_helper.ex +++ b/test/support/job_test_helper.ex @@ -60,7 +60,7 @@ defmodule TaskBunny.JobTestHelper do def wait_for_connection(host) do Enum.find_value 1..100, fn (_) -> - case TaskBunny.Connection.monitor_connection(host, self()) do + case TaskBunny.Connection.subscribe_connection(host, self()) do :ok -> true _ -> :timer.sleep(10) diff --git a/test/support/queue_test_helper.ex b/test/support/queue_test_helper.ex index b11dd67..c658621 100644 --- a/test/support/queue_test_helper.ex +++ b/test/support/queue_test_helper.ex @@ -17,7 +17,7 @@ defmodule TaskBunny.QueueTestHelper do # Queue Helpers def open_channel(host \\ :default) do - conn = TaskBunny.Connection.get_connection(host) + conn = TaskBunny.Connection.get_connection!(host) {:ok, _channel} = AMQP.Channel.open(conn) end diff --git a/test/task_bunny/supervisor_test.exs b/test/task_bunny/supervisor_test.exs index a893c09..9f9f787 100644 --- a/test/task_bunny/supervisor_test.exs +++ b/test/task_bunny/supervisor_test.exs @@ -70,7 +70,7 @@ defmodule TaskBunny.SupervisorTest do work_pid = Process.whereis(work_name) # Close the connection - conn = Connection.get_connection(@host) + conn = Connection.get_connection!(@host) AMQP.Connection.close(conn) wait_for_process_died(conn_pid) JobTestHelper.wait_for_connection(@host) diff --git a/test/task_bunny/worker_supervisor_test.exs b/test/task_bunny/worker_supervisor_test.exs index b86cc98..0224067 100644 --- a/test/task_bunny/worker_supervisor_test.exs +++ b/test/task_bunny/worker_supervisor_test.exs @@ -74,7 +74,7 @@ defmodule TaskBunny.WorkerSupervisorTest do assert JobTestHelper.performed_count() == 0 %{message_count: count} = Queue.state( - Connection.get_connection(), @queue + Connection.get_connection!(), @queue ) assert count == 1 @@ -118,7 +118,7 @@ defmodule TaskBunny.WorkerSupervisorTest do :timer.sleep(1_100) %{message_count: count} = Queue.state( - Connection.get_connection(), @queue + Connection.get_connection!(), @queue ) # Make sure ack is sent and message was removed. diff --git a/test/task_bunny/worker_test.exs b/test/task_bunny/worker_test.exs index b1f2a63..d7896b4 100644 --- a/test/task_bunny/worker_test.exs +++ b/test/task_bunny/worker_test.exs @@ -47,7 +47,7 @@ defmodule TaskBunny.WorkerTest do TestJob.enqueue(payload, queue: @queue) JobTestHelper.wait_for_perform() - conn = Connection.get_connection() + conn = Connection.get_connection!() %{message_count: main_count} = Queue.state(conn, main) %{message_count: retry_count} = Queue.state(conn, retry) %{message_count: rejected_count} = Queue.state(conn, rejected) @@ -87,7 +87,7 @@ defmodule TaskBunny.WorkerTest do TestJob.enqueue(payload, queue: @queue) JobTestHelper.wait_for_perform() - conn = Connection.get_connection() + conn = Connection.get_connection!() %{message_count: main_count} = Queue.state(conn, main) %{message_count: retry_count} = Queue.state(conn, retry) %{message_count: rejected_count} = Queue.state(conn, rejected) @@ -118,7 +118,7 @@ defmodule TaskBunny.WorkerTest do # 1 normal + 10 retries = 11 assert JobTestHelper.performed_count == 11 - conn = Connection.get_connection() + conn = Connection.get_connection!() %{message_count: main_count} = Queue.state(conn, main) %{message_count: retry_count} = Queue.state(conn, retry) %{message_count: rejected_count} = Queue.state(conn, rejected) From 1bb484f0e016049607403eb9af82a694bc02fb1c Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Thu, 9 Mar 2017 17:59:26 +0000 Subject: [PATCH 3/8] Consistent API return values --- lib/task_bunny/config.ex | 4 ++ lib/task_bunny/consumer.ex | 23 +++--- lib/task_bunny/errors.ex | 76 +++++++++++++++++++- lib/task_bunny/job.ex | 104 ++++++++++++++++++---------- lib/task_bunny/message.ex | 36 ++++++++-- lib/task_bunny/publisher.ex | 40 ++++++----- lib/task_bunny/worker.ex | 4 +- test/task_bunny/job_runner_test.exs | 2 +- test/task_bunny/message_test.exs | 6 +- 9 files changed, 218 insertions(+), 77 deletions(-) diff --git a/lib/task_bunny/config.ex b/lib/task_bunny/config.ex index 870187e..36b7d1a 100644 --- a/lib/task_bunny/config.ex +++ b/lib/task_bunny/config.ex @@ -2,6 +2,7 @@ defmodule TaskBunny.Config do @moduledoc """ Modules that help you access to TaskBunny config values """ + alias TaskBunny.ConfigError @default_concurrency 2 @@ -50,6 +51,9 @@ defmodule TaskBunny.Config do queue_config[:queues] |> Enum.map(fn (queue) -> + unless queue[:name] do + raise ConfigError, "name is missing in queue definition. #{inspect queue}" + end Keyword.merge(queue, [name: namespace <> queue[:name]]) end) end diff --git a/lib/task_bunny/consumer.ex b/lib/task_bunny/consumer.ex index d5bb7d0..4c3e4f3 100644 --- a/lib/task_bunny/consumer.ex +++ b/lib/task_bunny/consumer.ex @@ -7,19 +7,20 @@ defmodule TaskBunny.Consumer do @doc """ Opens a channel for the given connection and start consuming messages for the queue. """ - @spec consume(struct, String.t, integer) :: {struct, String.t} | nil - # TODO: returns tuple + @spec consume(AMQP.Connection.t, String.t, integer) :: {:ok, AMQP.Channel.t, String.t} | {:error, any} def consume(connection, queue, concurrency) do - case AMQP.Channel.open(connection) do - {:ok, channel} -> - :ok = AMQP.Basic.qos(channel, prefetch_count: concurrency) - {:ok, consumer_tag} = AMQP.Basic.consume(channel, queue) - - {channel, consumer_tag} + with {:ok, channel} <- AMQP.Channel.open(connection), + :ok <- AMQP.Basic.qos(channel, prefetch_count: concurrency), + {:ok, consumer_tag} <- AMQP.Basic.consume(channel, queue) do + {:ok, channel, consumer_tag} + else error -> - Logger.warn "TaskBunny.Consumer: failed to open channel for #{queue}. Detail: #{inspect error}" + Logger.warn """ + TaskBunny.Consumer: start consumer for #{queue}. + Detail: #{inspect error}" + """ - nil + {:error, error} end end @@ -34,7 +35,7 @@ defmodule TaskBunny.Consumer do @doc """ Acknowledges to the message. """ - @spec ack(%AMQP.Channel{}, map, boolean) :: :ok + @spec ack(AMQP.Channel.t, map, boolean) :: :ok def ack(channel, meta, succeeded) def ack(channel, %{delivery_tag: tag}, true), do: AMQP.Basic.ack(channel, tag) diff --git a/lib/task_bunny/errors.ex b/lib/task_bunny/errors.ex index 00725f6..7078462 100644 --- a/lib/task_bunny/errors.ex +++ b/lib/task_bunny/errors.ex @@ -1,3 +1,17 @@ +defmodule TaskBunny.ConfigError do + @moduledoc """ + Raised when an error was found on TaskBunny config + """ + defexception [:message] + + @spec exception(String.t) :: map + def exception(message) do + title = "Failed to load TaskBunny config" + message = "#{title}\n#{message}" + %__MODULE__{message: message} + end +end + defmodule TaskBunny.Connection.ConnectError do @moduledoc """ Raised when failed to retain a connection @@ -6,7 +20,7 @@ defmodule TaskBunny.Connection.ConnectError do @spec exception(keyword) :: map def exception(_opts = [type: type, host: host]) do - title = "Failed to get a connection to host '#{host}':" + title = "Failed to get a connection to host '#{host}'." detail = case type do :invalid_host -> "The host is not defined in config" @@ -25,9 +39,69 @@ defmodule TaskBunny.Connection.ConnectError do - Make sure you can connect to RabbitMQ from the application host. - You might try to get connection before process is ready. """ + fallback -> + "#{fallback}" end message = "#{title}\n#{detail}" %__MODULE__{message: message, type: type} end end + +defmodule TaskBunny.Job.QueueNotFoundError do + @moduledoc """ + Raised when failed to find a queue for the job. + """ + defexception [:job, :message] + + @spec exception(atom) :: map + def exception(job) do + title = "Failed to find a queue for the job." + detail = "job=#{job}" + + message = "#{title}\n#{detail}" + %__MODULE__{message: message, job: job} + end +end + +defmodule TaskBunny.Message.DecodeError do + @moduledoc """ + Raised when failed to decode the message. + """ + defexception [:message] + + @spec exception(keyword) :: map + def exception(opts) do + title = "Failed to decode the message." + detail = case opts[:type] do + :job_not_loaded -> + "Job is not valid Elixir module" + :poison_decode_error -> + "Failed to decode the message in JSON. error=#{inspect opts[:error]}" + :decode_error -> + "Failed to decode the message. error=#{inspect opts[:error]}" + fallback -> + "#{fallback}" + end + + message = "#{title}\n#{detail}\nmessage body=#{opts[:body]}" + %__MODULE__{message: message} + end +end + +defmodule TaskBunny.Publisher.PublishError do + @moduledoc """ + Raised when failed to publish the message. + """ + defexception [:message, :inner_error] + + @spec exception(any) :: map + def exception(inner_error) do + title = "Failed to publish the message." + detail = "error=#{inspect inner_error}" + + message = "#{title}\n#{detail}" + %__MODULE__{message: message, inner_error: inner_error} + end + +end diff --git a/lib/task_bunny/job.ex b/lib/task_bunny/job.ex index 5ea4428..e723d38 100644 --- a/lib/task_bunny/job.ex +++ b/lib/task_bunny/job.ex @@ -1,67 +1,97 @@ defmodule TaskBunny.Job do - @moduledoc false + @moduledoc """ + TODO: Write me + """ @callback perform(any) :: :ok | {:error, term} + require Logger alias TaskBunny.{Config, Queue, Job, Message, Publisher} + alias TaskBunny.{ + Publisher.PublishError, Connection.ConnectError, Job.QueueNotFoundError + } defmacro __using__(_options \\ []) do quote do @behaviour Job - require Logger + @doc false @spec enqueue(any, keyword) :: :ok | {:error, any} def enqueue(payload, options \\ []) do - queue_data = Config.queue_for_job(__MODULE__) - - queue = options[:queue] || queue_data[:name] - host = options[:host] || queue_data[:host] || :default - message = Message.encode(__MODULE__, payload) - - do_enqueue(host, queue, message) + TaskBunny.Job.enqueue(__MODULE__, payload, options) end - # TODO: enqueue! - # custom errors - - @spec do_enqueue(atom, String.t|nil, String.t) :: :ok | {:error, any} - defp do_enqueue(host, nil, message) do - {:error, "Can't find a queue for #{__MODULE__}"} - end - - defp do_enqueue(host, queue, message) do - declare_queue(host, queue) - Publisher.publish(host, queue, message) - end - - @spec declare_queue(atom, String.t) :: :ok - defp declare_queue(host, queue) do - Queue.declare_with_subqueues(host, queue) - :ok - catch - :exit, e -> - # Handles the error but we carry on... - # It's highly likely caused by the options on queue declare don't match. - # e.g. retry interbval a.k.a message ttl in retry queue - # We carry on with error log. - Logger.error "failed to declare queue for #{queue}. If you have changed the queue configuration, you have to delete the queue and create it again. Error: #{inspect e}" - - {:error, {:exit, e}} + @doc false + @spec enqueue!(any, keyword) :: :ok | {:error, any} + def enqueue!(payload, options \\ []) do + TaskBunny.Job.enqueue!(__MODULE__, payload, options) end - @doc false # Returns timeout (default 2 minutes). # Overwrite the method to change the timeout. + @doc false + @spec timeout() :: integer def timeout, do: 120_000 # Retries 10 times in every 5 minutes in default. # You have to re-create the queue after you change retry_interval. + @doc false + @spec timeout() :: integer def max_retry, do: 10 + + @doc false + @spec retry_interval(integer) :: integer def retry_interval(_failed_count), do: 300_000 defoverridable [timeout: 0, max_retry: 0, retry_interval: 1] end end - # TODO: documentation + @doc """ + Enqueues a job with payload. + """ + @spec enqueue(atom, any, keyword) :: :ok | {:error, any} + def enqueue(job, payload, options \\ []) do + enqueue!(job, payload, options) + + rescue + e in [ConnectError, PublishError, QueueNotFoundError] -> {:error, e} + end + + @doc """ + Similar to enqueue/3 but raises an exception on error. + """ + @spec enqueue!(atom, any, keyword) :: :ok + def enqueue!(job, payload, options \\ []) do + queue_data = Config.queue_for_job(job) || [] + + host = options[:host] || queue_data[:host] || :default + {:ok, message} = Message.encode(job, payload) + + case options[:queue] || queue_data[:name] do + nil -> raise QueueNotFoundError, job + queue -> do_enqueue(host, queue, message) + end + end + + @spec do_enqueue(atom, String.t, String.t) :: :ok | {:error, any} + defp do_enqueue(host, queue, message) do + declare_queue(host, queue) + Publisher.publish!(host, queue, message) + end + + @spec declare_queue(atom, String.t) :: :ok + defp declare_queue(host, queue) do + Queue.declare_with_subqueues(host, queue) + :ok + catch + :exit, e -> + # Handles the error but we carry on... + # It's highly likely caused by the options on queue declare don't match. + # e.g. retry interbval a.k.a message ttl in retry queue + # We carry on with error log. + Logger.error "TaskBunny.job: Failed to declare queue for #{queue}. If you have changed the queue configuration, you have to delete the queue and create it again. Error: #{inspect e}" + + {:error, {:exit, e}} + end end diff --git a/lib/task_bunny/message.ex b/lib/task_bunny/message.ex index 289c436..2bf2928 100644 --- a/lib/task_bunny/message.ex +++ b/lib/task_bunny/message.ex @@ -2,23 +2,35 @@ defmodule TaskBunny.Message do @moduledoc """ Functions to access messages and its meta data. """ + alias TaskBunny.Message.DecodeError @doc """ Encode message body in JSON with job and arugment. """ - @spec encode(atom, any) :: String.t - # TODO: return tuple + @spec encode(atom, any) :: {:ok, String.t} def encode(job, payload) do + data = message_data(job, payload) + Poison.encode(data, pretty: true) + end + + @doc """ + Similar to encode/2 but raises an exception on error + """ + @spec encode!(atom, any) :: String.t + def encode!(job, payload) do + data = message_data(job, payload) + Poison.encode!(data, pretty: true) + end + + @spec message_data(atom, any) :: map + defp message_data(job, payload) do %{ "job" => encode_job(job), "payload" => payload, "created_at" => DateTime.utc_now() } - |> Poison.encode!(pretty: true) end - # TODO: encode! - @doc """ Decode message body in JSON to map """ @@ -39,7 +51,19 @@ defmodule TaskBunny.Message do error -> {:error, {:decode_exception, error}} end - # TODO: decode! + @doc """ + Similar to decode/1 but raises an exception on error. + """ + @spec decode!(String.t) :: map + def decode!(message) do + case decode(message) do + {:ok, decoded} -> decoded + {:error, {error_type, error}} -> + raise DecodeError, type: error_type, body: message, error: error + {:error, error_type} -> + raise DecodeError, type: error_type, body: message + end + end @spec encode_job(atom) :: String.t defp encode_job(job) do diff --git a/lib/task_bunny/publisher.ex b/lib/task_bunny/publisher.ex index 5794374..8d2d881 100644 --- a/lib/task_bunny/publisher.ex +++ b/lib/task_bunny/publisher.ex @@ -6,6 +6,7 @@ defmodule TaskBunny.Publisher do You should use Job.enqueue to enqueue a job from your application. """ require Logger + alias TaskBunny.{Publisher.PublishError, Connection.ConnectError} @doc """ Publish a message to the queue. @@ -15,27 +16,34 @@ defmodule TaskBunny.Publisher do """ @spec publish(atom, String.t, String.t, keyword) :: :ok | {:error, any} def publish(host, queue, message, options \\ []) do - {:ok, conn} = TaskBunny.Connection.get_connection(host) - exchange = "" - routing_key = queue - options = Keyword.merge([persistent: true], options) + publish!(host, queue, message, options) - do_publish(conn, exchange, routing_key, message, options) + rescue + e in [ConnectError, PublishError] -> {:error, e} end - # TODO: publish! + @doc """ + Similar to publish/4 but raises exception on error. + """ + @spec publish!(atom, String.t, String.t, keyword) :: :ok + def publish!(host, queue, message, options \\ []) do + Logger.debug """ + TaskBunny.Publisher: publish + #{host}:#{queue}: #{inspect message}. options = #{inspect options} + """ - @spec do_publish(AMQP.Connection.t, String.t, String.t, String.t, keyword) :: :ok | {:error, any} - defp do_publish(nil, _, _, _, _), do: {:error, "Failed to connect to AMQP host"} + conn = TaskBunny.Connection.get_connection!(host) - defp do_publish(conn, exchange, routing_key, message, options) do - Logger.debug "TaskBunny.Publisher: publish:\r\n #{exchange} - #{routing_key}: #{inspect message}. options = #{inspect options}" + exchange = "" + routing_key = queue + options = Keyword.merge([persistent: true], options) - # TODO: returns detail error - {:ok, channel} = AMQP.Channel.open(conn) - :ok = AMQP.Basic.publish(channel, exchange, routing_key, message, options) - :ok = AMQP.Channel.close(channel) - rescue - e in MatchError -> {:error, e} + with {:ok, channel} <- AMQP.Channel.open(conn), + :ok <- AMQP.Basic.publish(channel, exchange, routing_key, message, options), + :ok <- AMQP.Channel.close(channel) do + :ok + else + error -> raise PublishError, error + end end end diff --git a/lib/task_bunny/worker.ex b/lib/task_bunny/worker.ex index 2739097..fe80606 100644 --- a/lib/task_bunny/worker.ex +++ b/lib/task_bunny/worker.ex @@ -125,10 +125,10 @@ defmodule TaskBunny.Worker do # Consumes the queue case Consumer.consume(connection, state.queue, state.concurrency) do - {channel, consumer_tag} -> + {:ok, channel, consumer_tag} -> Logger.info log_msg("start comsuming", state) {:noreply, %{state | channel: channel, consumer_tag: consumer_tag}} - error -> + {:error, error} -> {:stop, {:failed_to_consume, error}, state} end end diff --git a/test/task_bunny/job_runner_test.exs b/test/task_bunny/job_runner_test.exs index 55a0fd5..57834f5 100644 --- a/test/task_bunny/job_runner_test.exs +++ b/test/task_bunny/job_runner_test.exs @@ -49,7 +49,7 @@ defmodule TaskBunny.JobRunnerTest do describe "invoke" do defp message(job, payload, meta) do - body = TaskBunny.Message.encode(job, payload) + body = TaskBunny.Message.encode!(job, payload) {body, meta} end diff --git a/test/task_bunny/message_test.exs b/test/task_bunny/message_test.exs index 01816d2..4520932 100644 --- a/test/task_bunny/message_test.exs +++ b/test/task_bunny/message_test.exs @@ -10,7 +10,7 @@ defmodule TaskBunny.MessageTest do describe "encode/decode message body(payload)" do test "encode and decode payload" do - encoded = Message.encode(NameJob, %{"name" => "Joe"}) + {:ok, encoded} = Message.encode(NameJob, %{"name" => "Joe"}) {:ok, %{"job" => job, "payload" => payload}} = Message.decode(encoded) assert job.perform(payload) == {:ok, "Joe"} end @@ -26,7 +26,7 @@ defmodule TaskBunny.MessageTest do end test "decode invalid job" do - encoded = Message.encode(InvalidJob, %{"name" => "Joe"}) + encoded = Message.encode!(InvalidJob, %{"name" => "Joe"}) assert {:error, :job_not_loaded} == Message.decode(encoded) end @@ -39,7 +39,7 @@ defmodule TaskBunny.MessageTest do describe "add_error_log" do @tag timeout: 1000 test "adds error information to the message" do - message = Message.encode(NameJob, %{"name" => "Joe"}) + message = Message.encode!(NameJob, %{"name" => "Joe"}) error = {:error, "HTTP Request error"} new_message = Message.add_error_log(message, error) {:ok, %{"errors" => [added | _]}} = Message.decode(new_message) From 199acc5d70d85f608a70f69361c365319d5b7ec5 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Fri, 10 Mar 2017 10:04:14 +0000 Subject: [PATCH 4/8] Tweak with style --- lib/task_bunny/consumer.ex | 3 ++- lib/task_bunny/publisher.ex | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/task_bunny/consumer.ex b/lib/task_bunny/consumer.ex index 4c3e4f3..cf6b487 100644 --- a/lib/task_bunny/consumer.ex +++ b/lib/task_bunny/consumer.ex @@ -11,7 +11,8 @@ defmodule TaskBunny.Consumer do def consume(connection, queue, concurrency) do with {:ok, channel} <- AMQP.Channel.open(connection), :ok <- AMQP.Basic.qos(channel, prefetch_count: concurrency), - {:ok, consumer_tag} <- AMQP.Basic.consume(channel, queue) do + {:ok, consumer_tag} <- AMQP.Basic.consume(channel, queue) + do {:ok, channel, consumer_tag} else error -> diff --git a/lib/task_bunny/publisher.ex b/lib/task_bunny/publisher.ex index 8d2d881..c399d3a 100644 --- a/lib/task_bunny/publisher.ex +++ b/lib/task_bunny/publisher.ex @@ -40,7 +40,8 @@ defmodule TaskBunny.Publisher do with {:ok, channel} <- AMQP.Channel.open(conn), :ok <- AMQP.Basic.publish(channel, exchange, routing_key, message, options), - :ok <- AMQP.Channel.close(channel) do + :ok <- AMQP.Channel.close(channel) + do :ok else error -> raise PublishError, error From 5337bb417b48ea8dbd6273bf22358d10b856e288 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Fri, 10 Mar 2017 10:19:49 +0000 Subject: [PATCH 5/8] Fix credo/dialyzer warnings --- lib/task_bunny/config.ex | 4 ++-- lib/task_bunny/errors.ex | 17 ++++++++--------- lib/task_bunny/job.ex | 2 +- lib/task_bunny/publisher.ex | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/task_bunny/config.ex b/lib/task_bunny/config.ex index 36b7d1a..351fcfd 100644 --- a/lib/task_bunny/config.ex +++ b/lib/task_bunny/config.ex @@ -52,7 +52,7 @@ defmodule TaskBunny.Config do queue_config[:queues] |> Enum.map(fn (queue) -> unless queue[:name] do - raise ConfigError, "name is missing in queue definition. #{inspect queue}" + raise ConfigError, message: "name is missing in queue definition. #{inspect queue}" end Keyword.merge(queue, [name: namespace <> queue[:name]]) end) @@ -84,7 +84,7 @@ defmodule TaskBunny.Config do @doc """ Returns queue for the given job """ - @spec queue_for_job(atom) :: keyword + @spec queue_for_job(atom) :: keyword | nil def queue_for_job(job) do Enum.find(queues(), fn (queue) -> match_job?(job, queue[:jobs]) diff --git a/lib/task_bunny/errors.ex b/lib/task_bunny/errors.ex index 7078462..21f905d 100644 --- a/lib/task_bunny/errors.ex +++ b/lib/task_bunny/errors.ex @@ -4,8 +4,8 @@ defmodule TaskBunny.ConfigError do """ defexception [:message] - @spec exception(String.t) :: map - def exception(message) do + @lint false + def exception(message: message) do title = "Failed to load TaskBunny config" message = "#{title}\n#{message}" %__MODULE__{message: message} @@ -18,7 +18,7 @@ defmodule TaskBunny.Connection.ConnectError do """ defexception [:type, :message] - @spec exception(keyword) :: map + @lint false def exception(_opts = [type: type, host: host]) do title = "Failed to get a connection to host '#{host}'." detail = case type do @@ -54,8 +54,8 @@ defmodule TaskBunny.Job.QueueNotFoundError do """ defexception [:job, :message] - @spec exception(atom) :: map - def exception(job) do + @lint false + def exception(job: job) do title = "Failed to find a queue for the job." detail = "job=#{job}" @@ -70,7 +70,7 @@ defmodule TaskBunny.Message.DecodeError do """ defexception [:message] - @spec exception(keyword) :: map + @lint false def exception(opts) do title = "Failed to decode the message." detail = case opts[:type] do @@ -95,13 +95,12 @@ defmodule TaskBunny.Publisher.PublishError do """ defexception [:message, :inner_error] - @spec exception(any) :: map - def exception(inner_error) do + @lint false + def exception(inner_error: inner_error) do title = "Failed to publish the message." detail = "error=#{inspect inner_error}" message = "#{title}\n#{detail}" %__MODULE__{message: message, inner_error: inner_error} end - end diff --git a/lib/task_bunny/job.ex b/lib/task_bunny/job.ex index e723d38..21f5101 100644 --- a/lib/task_bunny/job.ex +++ b/lib/task_bunny/job.ex @@ -69,7 +69,7 @@ defmodule TaskBunny.Job do {:ok, message} = Message.encode(job, payload) case options[:queue] || queue_data[:name] do - nil -> raise QueueNotFoundError, job + nil -> raise QueueNotFoundError, job: job queue -> do_enqueue(host, queue, message) end end diff --git a/lib/task_bunny/publisher.ex b/lib/task_bunny/publisher.ex index c399d3a..ac404ed 100644 --- a/lib/task_bunny/publisher.ex +++ b/lib/task_bunny/publisher.ex @@ -44,7 +44,7 @@ defmodule TaskBunny.Publisher do do :ok else - error -> raise PublishError, error + error -> raise PublishError, inner_error: error end end end From c369e5a665610f674ae4808711073b6bd1e82aa6 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Fri, 10 Mar 2017 10:22:26 +0000 Subject: [PATCH 6/8] Tweak a comment --- lib/task_bunny/job.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/task_bunny/job.ex b/lib/task_bunny/job.ex index 21f5101..bf17532 100644 --- a/lib/task_bunny/job.ex +++ b/lib/task_bunny/job.ex @@ -88,7 +88,6 @@ defmodule TaskBunny.Job do :exit, e -> # Handles the error but we carry on... # It's highly likely caused by the options on queue declare don't match. - # e.g. retry interbval a.k.a message ttl in retry queue # We carry on with error log. Logger.error "TaskBunny.job: Failed to declare queue for #{queue}. If you have changed the queue configuration, you have to delete the queue and create it again. Error: #{inspect e}" From 46f72b0bc5a70440b912853906b2a8280e35f45c Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Fri, 10 Mar 2017 10:50:02 +0000 Subject: [PATCH 7/8] Add some regression tests for enqueue --- test/task_bunny/job_test.exs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/task_bunny/job_test.exs b/test/task_bunny/job_test.exs index 1b93cd1..cf93892 100644 --- a/test/task_bunny/job_test.exs +++ b/test/task_bunny/job_test.exs @@ -25,5 +25,21 @@ defmodule TaskBunny.JobTest do {:ok, %{"payload" => received_payload}} = Message.decode(received) assert received_payload == payload end + + test "returns an error for wrong option" do + payload = %{"foo" => "bar"} + assert {:error, _} = TestJob.enqueue( + payload, queue: @queue, host: :invalid_host + ) + end + end + + describe "enqueue!" do + test "raises an exception for a wrong host" do + payload = %{"foo" => "bar"} + assert_raise TaskBunny.Connection.ConnectError, fn -> + TestJob.enqueue!(payload, queue: @queue, host: :invalid_host) + end + end end end From ea3a3feddb1a5bcbce0664af4b3b4bf46e757234 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Fri, 10 Mar 2017 10:51:47 +0000 Subject: [PATCH 8/8] Remove compile warnings --- lib/task_bunny/errors.ex | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/task_bunny/errors.ex b/lib/task_bunny/errors.ex index 21f905d..e91017b 100644 --- a/lib/task_bunny/errors.ex +++ b/lib/task_bunny/errors.ex @@ -10,6 +10,7 @@ defmodule TaskBunny.ConfigError do message = "#{title}\n#{message}" %__MODULE__{message: message} end + _ = @lint end defmodule TaskBunny.Connection.ConnectError do @@ -46,6 +47,7 @@ defmodule TaskBunny.Connection.ConnectError do message = "#{title}\n#{detail}" %__MODULE__{message: message, type: type} end + _ = @lint end defmodule TaskBunny.Job.QueueNotFoundError do @@ -62,6 +64,7 @@ defmodule TaskBunny.Job.QueueNotFoundError do message = "#{title}\n#{detail}" %__MODULE__{message: message, job: job} end + _ = @lint end defmodule TaskBunny.Message.DecodeError do @@ -87,6 +90,7 @@ defmodule TaskBunny.Message.DecodeError do message = "#{title}\n#{detail}\nmessage body=#{opts[:body]}" %__MODULE__{message: message} end + _ = @lint end defmodule TaskBunny.Publisher.PublishError do @@ -103,4 +107,5 @@ defmodule TaskBunny.Publisher.PublishError do message = "#{title}\n#{detail}" %__MODULE__{message: message, inner_error: inner_error} end + _ = @lint end