Skip to content

Commit

Permalink
Merge pull request #201 from opt-elixir/add-logging-for-unexpected-he…
Browse files Browse the repository at this point in the history
…artbeat

Heartbeat can recover from an error condition.
  • Loading branch information
jeremyowensboggs authored Feb 27, 2024
2 parents 84eac8f + c3e99ea commit e41a140
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lib/faktory_worker/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ defmodule FaktoryWorker.ConnectionManager do
defp close_connection(%{conn: nil}), do: :ok

defp log_error(reason, {_, %{jid: jid}}) do
Logger.warning("[#{jid}] #{reason}")
Logger.warning("[faktory-worker] [#{jid}] #{reason}")
end
end
13 changes: 11 additions & 2 deletions lib/faktory_worker/worker/hearbeat_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do
alias FaktoryWorker.Worker.Server
alias FaktoryWorker.Worker.Pool

require Logger

@spec start_link(opts :: keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: name_from_opts(opts))
Expand Down Expand Up @@ -58,7 +60,7 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do

@impl true
def handle_info(:beat, %{conn: conn, process_wid: process_wid, beat_state: beat_state} = state)
when beat_state in [:ok, :quiet] do
when beat_state in [:ok, :quiet, :error] do
state =
conn
|> ConnectionManager.send_command({:beat, process_wid})
Expand All @@ -68,6 +70,10 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do
end

def handle_info(:beat, state) do
Logger.info(
"[faktory-worker] not sending heartbeat because the beat_state is #{inspect(state.beat_state)}"
)

{:noreply, %{state | beat_ref: nil}, {:continue, :schedule_beat}}
end

Expand Down Expand Up @@ -101,7 +107,10 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do
%{state | beat_state: new_beat_state, conn: conn, beat_ref: nil}
end

defp handle_beat_response({{result, _}, conn}, state) when result in [:ok, :error] do
defp handle_beat_response({{result, msg}, conn}, state) when result in [:ok, :error] do
if result != :ok,
do: Logger.info("[faktory-worker] unexpected heartbeat response #{inspect({result, msg})}")

Telemetry.execute(:beat, result, %{
prev_status: state.beat_state,
wid: state.process_wid
Expand Down
41 changes: 41 additions & 0 deletions test/faktory_worker/worker/heartbeat_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -379,5 +379,46 @@ defmodule FaktoryWorker.Worker.HeartbeatServerTest do

:ok = stop_supervised(:test_heartbeat_server)
end

test "should recover from error" do
worker_connection_mox()

expect(FaktoryWorker.SocketMock, :send, fn _, "BEAT " <> _ ->
:ok
end)

expect(FaktoryWorker.SocketMock, :recv, fn _ ->
{:error, "Failed to connect to Faktory"}
end)

expect(FaktoryWorker.SocketMock, :send, fn _, "BEAT " <> _ ->
:ok
end)

expect(FaktoryWorker.SocketMock, :send, fn _, "END" <> _ ->
:ok
end)

expect(FaktoryWorker.SocketMock, :recv, fn _ ->
# return the terminate state here to prevent futher beat commands
{:ok, "+{\"state\": \"terminate\"}\r\n"}
end)

opts = [
name: :test,
process_wid: Random.process_wid(),
beat_interval: 1,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

pid = start_supervised!(HeartbeatServer.child_spec(opts))

%{beat_state: :ok} = :sys.get_state(pid)

# # sleep 5 milliseconds to allow both beats to occur
Process.sleep(5)

:ok = stop_supervised(:test_heartbeat_server)
end
end
end
12 changes: 11 additions & 1 deletion test/faktory_worker/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,17 @@ defmodule FaktoryWorker.WorkerTest do

worker_connection_mox()

expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH test_queue default\r\n" ->
expect(FaktoryWorker.SocketMock, :send, fn _, "FETCH " <> msg ->
assert String.ends_with?(msg, "\r\n")

sorted_queues =
msg
|> String.trim("\r\n")
|> String.split(" ")
|> Enum.sort()

assert sorted_queues == ["default", "test_queue"]

:ok
end)

Expand Down

0 comments on commit e41a140

Please sign in to comment.