diff --git a/lib/faktory_worker.ex b/lib/faktory_worker.ex index 2df72c7..cf68aa0 100644 --- a/lib/faktory_worker.ex +++ b/lib/faktory_worker.ex @@ -89,7 +89,8 @@ defmodule FaktoryWorker do - `timeout` how long to wait for a response, in ms (default: #{@default_timeout}) """ - @spec send_command(command(), [send_command_opt()]) :: FaktoryWorker.Connection.response() | {:error, :timeout} + @spec send_command(command(), [send_command_opt()]) :: + FaktoryWorker.Connection.response() | {:error, :timeout} def send_command(command, opts \\ []) do try do opts @@ -101,7 +102,7 @@ defmodule FaktoryWorker do ) catch :exit, error -> - Logger.error(inspect(error)) + Logger.error("[faktory-worker] " <> inspect(error)) {:error, :timeout} end end diff --git a/lib/faktory_worker/connection_manager.ex b/lib/faktory_worker/connection_manager.ex index 36d65d0..afe871f 100644 --- a/lib/faktory_worker/connection_manager.ex +++ b/lib/faktory_worker/connection_manager.ex @@ -80,10 +80,12 @@ defmodule FaktoryWorker.ConnectionManager do end end - defp close_connection(%{conn: conn}) do + defp close_connection(%{conn: conn}) when not is_nil(conn) do Connection.close(conn) end + defp close_connection(%{conn: nil}), do: :ok + defp log_error(reason, {_, %{jid: jid}}) do Logger.warn("[#{jid}] #{reason}") end diff --git a/lib/faktory_worker/worker.ex b/lib/faktory_worker/worker.ex index 680c894..8d0cb9e 100644 --- a/lib/faktory_worker/worker.ex +++ b/lib/faktory_worker/worker.ex @@ -146,12 +146,12 @@ defmodule FaktoryWorker.Worker do %{ state - | worker_state: :running_job, - job_timeout_ref: timeout_ref, - job_start: job_start, - job_ref: job_ref, - job_id: job["jid"], - job: job + | worker_state: :running_job, + job_timeout_ref: timeout_ref, + job_start: job_start, + job_ref: job_ref, + job_id: job["jid"], + job: job } end @@ -200,13 +200,13 @@ defmodule FaktoryWorker.Worker do schedule_fetch(%{ state - | worker_state: :ok, - queues: nil, - job_timeout_ref: nil, - job_start: nil, - job_ref: nil, - job_id: nil, - job: nil + | worker_state: :ok, + queues: nil, + job_timeout_ref: nil, + job_start: nil, + job_ref: nil, + job_id: nil, + job: nil }) end @@ -223,13 +223,13 @@ defmodule FaktoryWorker.Worker do schedule_fetch(%{ state - | worker_state: :ok, - queues: nil, - job_timeout_ref: nil, - job_start: nil, - job_ref: nil, - job_id: nil, - job: nil + | worker_state: :ok, + queues: nil, + job_timeout_ref: nil, + job_start: nil, + job_ref: nil, + job_id: nil, + job: nil }) end diff --git a/test/faktory_worker/connection_manager_test.exs b/test/faktory_worker/connection_manager_test.exs index d6a5592..86993d1 100644 --- a/test/faktory_worker/connection_manager_test.exs +++ b/test/faktory_worker/connection_manager_test.exs @@ -91,7 +91,8 @@ defmodule FaktoryWorker.ConnectionManagerTest do {{:ok, result}, _} = ConnectionManager.send_command(state, {:push, payload}) assert result == "halt reason" - end) |> String.match?(~r/\[warn.*(? String.match?(~r/\[warn.*(? :ok end) + opts = [socket_handler: FaktoryWorker.SocketMock] state = ConnectionManager.new(opts) diff --git a/test/faktory_worker/worker_test.exs b/test/faktory_worker/worker_test.exs index 4b3acbc..0f52e12 100644 --- a/test/faktory_worker/worker_test.exs +++ b/test/faktory_worker/worker_test.exs @@ -193,7 +193,17 @@ defmodule FaktoryWorker.WorkerTest do worker_connection_mox() - expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH test_queue default\r\n" -> + expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH " <> msg -> + assert String.ends_with?(msg, "\r\n") + + sorted_queues = + msg + |> String.trim("\r\n") + |> String.split(" ") + |> Enum.sort() + + assert sorted_queues == ["default", "test_queue"] + :ok end) @@ -245,7 +255,7 @@ defmodule FaktoryWorker.WorkerTest do state = Worker.send_fetch(state) - assert state.queues == ["test_queue", "default"] + assert Enum.sort(state.queues) == Enum.sort(["test_queue", "default"]) end end @@ -264,6 +274,7 @@ defmodule FaktoryWorker.WorkerTest do |> Map.put(:job_id, "f47ccc395ef9d9646118434f") |> Map.put(:job, %{"jid" => "f47ccc395ef9d9646118434f"}) |> Map.put(:job_timeout_ref, :erlang.make_ref()) + |> Map.put(:job_start, System.monotonic_time(:millisecond)) end test "should stop the job process" do @@ -423,7 +434,8 @@ defmodule FaktoryWorker.WorkerTest do job_timeout_ref: nil, job_ref: nil, job_id: nil, - job: nil + job: nil, + job_start: nil } new_state = Worker.handle_fetch_response({:ok, job}, state) @@ -519,6 +531,7 @@ defmodule FaktoryWorker.WorkerTest do |> Map.put(:job_ref, :erlang.make_ref()) |> Map.put(:job, job) |> Map.put(:worker_state, :running_job) + |> Map.put(:job_start, System.monotonic_time(:millisecond)) end test "should send a successful 'ACK' to faktory" do @@ -598,11 +611,11 @@ defmodule FaktoryWorker.WorkerTest do assert_receive {[:faktory_worker, :ack], outcome, metadata} assert outcome == %{status: :ok} - assert metadata == %{ - jid: job_id, + assert %{ + jid: ^job_id, args: [%{"hey" => "there!"}], jobtype: "FaktoryWorker.TestQueueWorker" - } + } = metadata detach_event_handler(event_handler_id) end @@ -729,11 +742,13 @@ defmodule FaktoryWorker.WorkerTest do assert_receive {[:faktory_worker, :ack], outcome, metadata} assert outcome == %{status: :error} - assert metadata == %{ - jid: job_id, + assert %{ + jid: ^job_id, args: [%{"hey" => "there!"}], - jobtype: "FaktoryWorker.TestQueueWorker" - } + jobtype: "FaktoryWorker.TestQueueWorker", + queue: "test_queue", + duration: _ + } = metadata detach_event_handler(event_handler_id) end @@ -800,6 +815,7 @@ defmodule FaktoryWorker.WorkerTest do {:error, :closed} end) + expect(FaktoryWorker.SocketMock, :close, fn _ -> :ok end) # the connection manager retries a failed request once worker_connection_mox() @@ -825,11 +841,11 @@ defmodule FaktoryWorker.WorkerTest do assert_receive {[:faktory_worker, :failed_ack], outcome, metadata} assert outcome == %{status: :ok} - assert metadata == %{ - jid: job_id, + assert %{ + jid: ^job_id, args: [%{"hey" => "there!"}], jobtype: "FaktoryWorker.TestQueueWorker" - } + } = metadata assert_receive :fetch @@ -866,6 +882,8 @@ defmodule FaktoryWorker.WorkerTest do {:error, :closed} end) + expect(FaktoryWorker.SocketMock, :close, fn conn -> conn end) + # the connection manager retries a failed request one more time worker_connection_mox() @@ -891,11 +909,11 @@ defmodule FaktoryWorker.WorkerTest do assert_receive {[:faktory_worker, :failed_ack], outcome, metadata} assert outcome == %{status: :error} - assert metadata == %{ - jid: job_id, + assert %{ + jid: ^job_id, args: [%{"hey" => "there!"}], jobtype: "FaktoryWorker.TestQueueWorker" - } + } = metadata assert_receive :fetch