diff --git a/lib/faktory_worker/connection_manager.ex b/lib/faktory_worker/connection_manager.ex index 44d0530..3447d2e 100644 --- a/lib/faktory_worker/connection_manager.ex +++ b/lib/faktory_worker/connection_manager.ex @@ -87,6 +87,6 @@ defmodule FaktoryWorker.ConnectionManager do defp close_connection(%{conn: nil}), do: :ok defp log_error(reason, {_, %{jid: jid}}) do - Logger.warning("[#{jid}] #{reason}") + Logger.warning("[faktory-worker] [#{jid}] #{reason}") end end diff --git a/lib/faktory_worker/worker/hearbeat_server.ex b/lib/faktory_worker/worker/hearbeat_server.ex index 6d3f41d..5ab09e2 100644 --- a/lib/faktory_worker/worker/hearbeat_server.ex +++ b/lib/faktory_worker/worker/hearbeat_server.ex @@ -8,6 +8,8 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do alias FaktoryWorker.Worker.Server alias FaktoryWorker.Worker.Pool + require Logger + @spec start_link(opts :: keyword()) :: GenServer.on_start() def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: name_from_opts(opts)) @@ -58,7 +60,7 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do @impl true def handle_info(:beat, %{conn: conn, process_wid: process_wid, beat_state: beat_state} = state) - when beat_state in [:ok, :quiet] do + when beat_state in [:ok, :quiet, :error] do state = conn |> ConnectionManager.send_command({:beat, process_wid}) @@ -68,6 +70,10 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do end def handle_info(:beat, state) do + Logger.info( + "[faktory-worker] not sending heartbeat because the beat_state is #{inspect(state.beat_state)}" + ) + {:noreply, %{state | beat_ref: nil}, {:continue, :schedule_beat}} end @@ -101,7 +107,10 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do %{state | beat_state: new_beat_state, conn: conn, beat_ref: nil} end - defp handle_beat_response({{result, _}, conn}, state) when result in [:ok, :error] do + defp handle_beat_response({{result, msg}, conn}, state) when result in [:ok, :error] do + if result != :ok, + do: Logger.info("[faktory-worker] unexpected heartbeat response #{inspect({result, msg})}") + Telemetry.execute(:beat, result, %{ prev_status: state.beat_state, wid: state.process_wid diff --git a/test/faktory_worker/worker/heartbeat_server_test.exs b/test/faktory_worker/worker/heartbeat_server_test.exs index fdde773..7ec2643 100644 --- a/test/faktory_worker/worker/heartbeat_server_test.exs +++ b/test/faktory_worker/worker/heartbeat_server_test.exs @@ -379,5 +379,46 @@ defmodule FaktoryWorker.Worker.HeartbeatServerTest do :ok = stop_supervised(:test_heartbeat_server) end + + test "should recover from error" do + worker_connection_mox() + + expect(FaktoryWorker.SocketMock, :send, fn _, "BEAT " <> _ -> + :ok + end) + + expect(FaktoryWorker.SocketMock, :recv, fn _ -> + {:error, "Failed to connect to Faktory"} + end) + + expect(FaktoryWorker.SocketMock, :send, fn _, "BEAT " <> _ -> + :ok + end) + + expect(FaktoryWorker.SocketMock, :send, fn _, "END" <> _ -> + :ok + end) + + expect(FaktoryWorker.SocketMock, :recv, fn _ -> + # return the terminate state here to prevent futher beat commands + {:ok, "+{\"state\": \"terminate\"}\r\n"} + end) + + opts = [ + name: :test, + process_wid: Random.process_wid(), + beat_interval: 1, + connection: [socket_handler: FaktoryWorker.SocketMock] + ] + + pid = start_supervised!(HeartbeatServer.child_spec(opts)) + + %{beat_state: :ok} = :sys.get_state(pid) + + # # sleep 5 milliseconds to allow both beats to occur + Process.sleep(5) + + :ok = stop_supervised(:test_heartbeat_server) + end end end diff --git a/test/faktory_worker/worker_test.exs b/test/faktory_worker/worker_test.exs index 0f52e12..dea4f92 100644 --- a/test/faktory_worker/worker_test.exs +++ b/test/faktory_worker/worker_test.exs @@ -233,7 +233,17 @@ defmodule FaktoryWorker.WorkerTest do worker_connection_mox() - expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH test_queue default\r\n" -> + expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH " <> msg -> + assert String.ends_with?(msg, "\r\n") + + sorted_queues = + msg + |> String.trim("\r\n") + |> String.split(" ") + |> Enum.sort() + + assert sorted_queues == ["default", "test_queue"] + :ok end)