Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Publisher interface #18

Merged
merged 4 commits into from
Feb 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/mix/tasks/task_bunny.queue.reset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ defmodule Mix.Tasks.TaskBunny.Queue.Reset do
defp reset_queue(job_info) do
Mix.shell.info "Resetting queues for #{inspect job_info}"

conn = Connection.get_connection()
job = job_info[:job]
job.delete_queue(conn)
job.declare_queue(conn)
job.delete_queue()
job.declare_queue()
end
end
11 changes: 11 additions & 0 deletions lib/task_bunny/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ defmodule TaskBunny.Connection do
end
end

@doc """
Similar to `get_connection/1` but raises an error when connection is not ready.
"""
@spec get_connection!(atom) :: struct
def get_connection!(host \\ :default) do
case get_connection(host) do
nil -> raise "Failed to connect #{host}"
conn -> conn
end
end

@doc """
Asks server to send the connection back asynchronously.
Once connection has been established, it will send a message with {:connected, connection} to the given process.
Expand Down
25 changes: 15 additions & 10 deletions lib/task_bunny/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule TaskBunny.Job do

@callback perform(any) :: :ok | {:error, term}

alias TaskBunny.{Queue, Job, SyncPublisher}
alias TaskBunny.{Queue, Job, Message, Publisher}

defmacro __using__(job_options \\ []) do
quote do
Expand Down Expand Up @@ -45,9 +45,14 @@ defmodule TaskBunny.Job do
"#{namespace}.#{name}"
end

@spec enqueue(any) :: :ok | :failed
def enqueue(host \\ :default, payload) do
SyncPublisher.push(host, __MODULE__, payload)
@spec enqueue(any, keyword) :: :ok | {:error, any}
def enqueue(payload, options \\ []) do
host = options[:host] || :default
queue = queue_name()
message = Message.encode(__MODULE__, payload)

declare_queue(host)
Publisher.publish(host, queue, message)
end

@spec all_queues :: list(String.t)
Expand All @@ -59,10 +64,10 @@ defmodule TaskBunny.Job do
]
end

@spec declare_queue(%AMQP.Connection{}) :: :ok
def declare_queue(connection) do
@spec declare_queue(atom) :: :ok
def declare_queue(host \\ :default) do
Queue.declare_with_retry(
connection, queue_name(), retry_interval: retry_interval()
host, queue_name(), retry_interval: retry_interval()
)
:ok
catch
Expand All @@ -76,9 +81,9 @@ defmodule TaskBunny.Job do
{:error, {:exit, e}}
end

@spec delete_queue(%AMQP.Connection{}) :: :ok
def delete_queue(connection) do
Queue.delete_with_retry(connection, queue_name())
@spec delete_queue(atom) :: :ok
def delete_queue(host \\ :default) do
Queue.delete_with_retry(host, queue_name())
end

@doc false
Expand Down
38 changes: 38 additions & 0 deletions lib/task_bunny/publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule TaskBunny.Publisher do
@moduledoc """
Conviniences for publishing messages to a queue.

It provides lower level functions.
You should use Job.enqueue to enqueue a job from your application.
"""
require Logger

@doc """
Publish a message to the queue.

Returns `:ok` when the message has been successfully sent to the server.
Otherwise returns `{:error, detail}`
"""
@spec publish(atom, String.t, String.t, keyword) :: :ok | {:error, any}
def publish(host, queue, message, options \\ []) do
conn = TaskBunny.Connection.get_connection(host)
exchange = ""
routing_key = queue
options = Keyword.merge([persistent: true], options)

do_publish(conn, exchange, routing_key, message, options)
end

@spec do_publish(AMQP.Connection.t, String.t, String.t, String.t, keyword) :: :ok | {:error, any}
defp do_publish(nil, _, _, _, _), do: {:error, "Failed to connect to AMQP host"}

defp do_publish(conn, exchange, routing_key, message, options) do
Logger.debug "TaskBunny.Publisher: publish:\r\n #{exchange} - #{routing_key}: #{inspect message}"

{:ok, channel} = AMQP.Channel.open(conn)
:ok = AMQP.Basic.publish(channel, exchange, routing_key, message, options)
:ok = AMQP.Channel.close(channel)
rescue
e in MatchError -> {:error, e}
end
end
14 changes: 12 additions & 2 deletions lib/task_bunny/queue.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
defmodule TaskBunny.Queue do
@moduledoc false

@spec declare_with_retry(%AMQP.Connection{}, String.t, list) :: {map, map, map}
@spec declare_with_retry(%AMQP.Connection{} | atom, String.t, list) :: {map, map, map}
def declare_with_retry(host, queue_name, options) when is_atom(host) do
conn = TaskBunny.Connection.get_connection(host)
declare_with_retry(conn, queue_name, options)
end

def declare_with_retry(connection, queue_name, options) do
{:ok, channel} = AMQP.Channel.open(connection)

Expand Down Expand Up @@ -39,7 +44,12 @@ defmodule TaskBunny.Queue do
{work, retry, rejected}
end

@spec delete_with_retry(%AMQP.Connection{}, String.t) :: :ok
@spec delete_with_retry(%AMQP.Connection{} | atom, String.t) :: :ok
def declare_with_retry(host, queue_name) when is_atom(host) do
conn = TaskBunny.Connection.get_connection(host)
delete_with_retry(conn, queue_name)
end

def delete_with_retry(connection, queue_name) do
{:ok, channel} = AMQP.Channel.open(connection)

Expand Down
82 changes: 0 additions & 82 deletions lib/task_bunny/sync_publisher.ex

This file was deleted.

8 changes: 5 additions & 3 deletions lib/task_bunny/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule TaskBunny.Worker do
use GenServer
require Logger
alias TaskBunny.{Connection, Consumer, JobRunner, Queue,
SyncPublisher, Worker, Message}
Publisher, Worker, Message}

@type t ::%__MODULE__{
job: atom,
Expand Down Expand Up @@ -82,6 +82,8 @@ defmodule TaskBunny.Worker do
"""
@spec terminate(any, TaskBunny.Worker.t) :: :normal
def terminate(_reason, state) do
Logger.info "TaskBunny.Worker termintating: PID: #{inspect self()}"

if state.channel do
AMQP.Channel.close(state.channel)
end
Expand All @@ -100,7 +102,7 @@ defmodule TaskBunny.Worker do

def handle_info({:connected, connection}, state = %Worker{}) do
# Declares queue
state.job.declare_queue(connection)
state.job.declare_queue(state.host)

# Consumes the queue
case Consumer.consume(connection, state.job.queue_name(), state.concurrency) do
Expand Down Expand Up @@ -211,7 +213,7 @@ defmodule TaskBunny.Worker do
@spec reject_message(TaskBunny.Worker.t, any, any) :: {:noreply, TaskBunny.Worker.t}
defp reject_message(state, body, meta) do
rejected_queue = Queue.rejected_queue_name(state.job.queue_name())
SyncPublisher.push(state.host, rejected_queue, body)
Publisher.publish(state.host, rejected_queue, body)

Consumer.ack(state.channel, meta, true)

Expand Down
11 changes: 0 additions & 11 deletions test/support/queue_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ defmodule TaskBunny.TestSupport.QueueHelper do
{:ok, _channel} = AMQP.Channel.open(conn)
end

def push_when_server_back(queue, payload, host \\ :default) do
case TaskBunny.SyncPublisher.push(host, queue, payload) do
:ok ->
:ok
:failed ->
Process.sleep(100)

push_when_server_back(queue, payload, host)
end
end

def declare(queue, host \\ :default) do
{:ok, channel} = open_channel(host)
{:ok, _state} = AMQP.Queue.declare(channel, queue, durable: true)
Expand Down
23 changes: 23 additions & 0 deletions test/task_bunny/publisher_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule TaskBunny.PublisherTest do
use ExUnit.Case, async: false
import TaskBunny.TestSupport.QueueHelper
alias TaskBunny.{Publisher,TestSupport.QueueHelper}

@queue_name "task_bunny.test_queue"

setup do
clean([@queue_name])

:ok
end

describe "publish" do
test "publishes a message to a queue" do
QueueHelper.declare(@queue_name)
Publisher.publish(:default, @queue_name, "Hello Queue")

{message, _} = QueueHelper.pop(@queue_name)
assert message == "Hello Queue"
end
end
end
15 changes: 6 additions & 9 deletions test/task_bunny/status/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule TaskBunny.Status.WorkerTest do
use ExUnit.Case, async: false

import TaskBunny.TestSupport.QueueHelper
alias TaskBunny.{SyncPublisher, Config}
alias TaskBunny.Config
alias TaskBunny.TestSupport.JobTestHelper
alias TaskBunny.TestSupport.JobTestHelper.TestJob

Expand Down Expand Up @@ -73,8 +73,8 @@ defmodule TaskBunny.Status.WorkerTest do

Process.sleep(10)

SyncPublisher.push :foo, TestJob, payload
SyncPublisher.push :foo, TestJob, payload
TestJob.enqueue(payload, host: :foo)
TestJob.enqueue(payload, host: :foo)

JobTestHelper.wait_for_perform(2)

Expand All @@ -90,8 +90,7 @@ defmodule TaskBunny.Status.WorkerTest do
test "jobs succeeded" do
payload = %{"hello" => "world1"}

SyncPublisher.push TestJob, payload

TestJob.enqueue(payload)
JobTestHelper.wait_for_perform()

%{workers: workers} = TaskBunny.Status.overview(:foo_supervisor)
Expand All @@ -104,8 +103,7 @@ defmodule TaskBunny.Status.WorkerTest do
test "jobs failed" do
payload = %{"fail" => "fail"}

SyncPublisher.push TestJob, payload

TestJob.enqueue(payload)
JobTestHelper.wait_for_perform()

%{workers: workers} = TaskBunny.Status.overview(:foo_supervisor)
Expand All @@ -118,8 +116,7 @@ defmodule TaskBunny.Status.WorkerTest do
test "jobs rejected" do
payload = %{"fail" => "fail"}

SyncPublisher.push TaskBunny.Status.WorkerTest.RejectJob, payload

RejectJob.enqueue(payload)
JobTestHelper.wait_for_perform()

%{workers: workers} = TaskBunny.Status.overview(:foo_supervisor)
Expand Down
6 changes: 3 additions & 3 deletions test/task_bunny/supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule TaskBunny.SupervisorTest do
import TaskBunny.TestSupport.QueueHelper
alias TaskBunny.TestSupport.JobTestHelper
alias TaskBunny.TestSupport.JobTestHelper.TestJob
alias TaskBunny.{Config, Connection, SyncPublisher}
alias TaskBunny.{Config, Connection}

setup do
clean(TestJob.all_queues())
Expand Down Expand Up @@ -33,7 +33,7 @@ defmodule TaskBunny.SupervisorTest do

test "starts connection and worker" do
payload = %{"hello" => "world"}
SyncPublisher.push :foo, TestJob, payload
TestJob.enqueue(payload, host: :foo)

JobTestHelper.wait_for_perform
assert List.first(JobTestHelper.performed_payloads) == payload
Expand All @@ -60,7 +60,7 @@ defmodule TaskBunny.SupervisorTest do

# Make sure worker handles the job
payload = %{"hello" => "world"}
SyncPublisher.push :foo, TestJob, payload
TestJob.enqueue(payload, host: :foo)

JobTestHelper.wait_for_perform
assert List.first(JobTestHelper.performed_payloads) == payload
Expand Down
Loading