Skip to content

Commit

Permalink
Add job_timeout telemetry event to aid with debugging job failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Parod committed Apr 7, 2023
1 parent a84f68a commit cbcb54f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
12 changes: 11 additions & 1 deletion docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,14 @@ The available status outcomes are.

The `metadata` supplied with this event is a map containing the following fields.
- `wid` - A worker identifier that can be used to identify which worker emitted this event.
- `prev_status` - The `status` that was emitted on the previous `beat` event. This is useful for tracking when the status has changed between heartbeats.
- `prev_status` - The `status` that was emitted on the previous `beat` event. This is useful for tracking when the status has changed between heartbeats.

## Job Timeout Event

Event Name: `[:faktory_worker, :job_timeout]`.

The job timeout event is emitted 20 seconds prior to a job's reservation deadline. This 20-second window allows the job to be terminated before Faktory can expire and retry the job on the server.

The `status` status provided for this event will always be `{:error, :job_timeout}`.

The `metadata` supplied with this event is a map containing the job's ID, arguments, and type.
8 changes: 7 additions & 1 deletion lib/faktory_worker/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule FaktoryWorker.Telemetry do

require Logger

@events [:push, :beat, :fetch, :ack, :failed_ack, :batch_new, :batch_open, :batch_commit]
@events [:push, :beat, :fetch, :ack, :failed_ack, :job_timeout, :batch_new, :batch_open, :batch_commit]

@doc false
@spec attach_default_handler :: :ok | {:error, :already_exists}
Expand Down Expand Up @@ -83,6 +83,12 @@ defmodule FaktoryWorker.Telemetry do
log_error("Error sending 'FAIL' acknowledgement to faktory", job.jid, job.args, job.jobtype)
end

# Misc events

defp log_event(:job_timeout, _, job) do
log_error("Job has reached its reservation timeout and will be failed", job.jid, job.args, job.jobtype)
end

# Log formats

defp log_info(message) do
Expand Down
6 changes: 6 additions & 0 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ defmodule FaktoryWorker.Worker do

@spec stop_job(state :: __MODULE__.t()) :: __MODULE__.t()
def stop_job(%{job_ref: job_ref} = state) when job_ref != nil do
Telemetry.execute(:job_timeout, {:error, :job_timeout}, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
})

state
|> job_supervisor_name()
|> Task.Supervisor.terminate_child(job_ref.pid)
Expand Down
18 changes: 18 additions & 0 deletions test/faktory_worker/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,23 @@ defmodule FaktoryWorker.TelemetryTest do
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log job timeouts" do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
}

log_message =
capture_log(fn ->
Telemetry.handle_event([:faktory_worker, :job_timeout], nil, metadata, [])
end)

assert log_message =~
"[faktory-worker] Job has reached its reservation timeout and will be failed (#{
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end
end
end

0 comments on commit cbcb54f

Please sign in to comment.