Skip to content

Commit

Permalink
Merge pull request #183 from aparod/job_timeout_event
Browse files Browse the repository at this point in the history
Add job timeout event
  • Loading branch information
Ch4s3 authored Jun 12, 2023
2 parents 8e3407a + cbcb54f commit f7c68f4
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 7 deletions.
12 changes: 11 additions & 1 deletion docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,14 @@ The available status outcomes are.

The `metadata` supplied with this event is a map containing the following fields.
- `wid` - A worker identifier that can be used to identify which worker emitted this event.
- `prev_status` - The `status` that was emitted on the previous `beat` event. This is useful for tracking when the status has changed between heartbeats.
- `prev_status` - The `status` that was emitted on the previous `beat` event. This is useful for tracking when the status has changed between heartbeats.

## Job Timeout Event

Event Name: `[:faktory_worker, :job_timeout]`.

The job timeout event is emitted 20 seconds prior to a job's reservation deadline. This 20-second window allows the job to be terminated before Faktory can expire and retry the job on the server.

The `status` status provided for this event will always be `{:error, :job_timeout}`.

The `metadata` supplied with this event is a map containing the job's ID, arguments, and type.
12 changes: 7 additions & 5 deletions lib/faktory_worker/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule FaktoryWorker.Telemetry do

require Logger

@events [:push, :beat, :fetch, :ack, :failed_ack, :batch_new, :batch_open, :batch_commit]
@events [:push, :beat, :fetch, :ack, :failed_ack, :job_timeout, :batch_new, :batch_open, :batch_commit]

@doc false
@spec attach_default_handler :: :ok | {:error, :already_exists}
Expand Down Expand Up @@ -83,6 +83,12 @@ defmodule FaktoryWorker.Telemetry do
log_error("Error sending 'FAIL' acknowledgement to faktory", job.jid, job.args, job.jobtype)
end

# Misc events

defp log_event(:job_timeout, _, job) do
log_error("Job has reached its reservation timeout and will be failed", job.jid, job.args, job.jobtype)
end

# Log formats

defp log_info(message) do
Expand Down Expand Up @@ -113,10 +119,6 @@ defmodule FaktoryWorker.Telemetry do
Logger.error("[faktory-worker] #{message}")
end

defp log_error(outcome, wid) do
log_error("#{outcome} wid-#{wid}")
end

defp log_error(outcome, jid, args, worker_module) do
log_error("#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}")
end
Expand Down
6 changes: 6 additions & 0 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ defmodule FaktoryWorker.Worker do

@spec stop_job(state :: __MODULE__.t()) :: __MODULE__.t()
def stop_job(%{job_ref: job_ref} = state) when job_ref != nil do
Telemetry.execute(:job_timeout, {:error, :job_timeout}, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
})

state
|> job_supervisor_name()
|> Task.Supervisor.terminate_child(job_ref.pid)
Expand Down
18 changes: 18 additions & 0 deletions test/faktory_worker/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,23 @@ defmodule FaktoryWorker.TelemetryTest do
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end

test "should log job timeouts" do
metadata = %{
jid: Random.job_id(),
args: %{hey: "there!"},
jobtype: "TestQueueWorker"
}

log_message =
capture_log(fn ->
Telemetry.handle_event([:faktory_worker, :job_timeout], nil, metadata, [])
end)

assert log_message =~
"[faktory-worker] Job has reached its reservation timeout and will be failed (#{
metadata.jobtype
}) jid-#{metadata.jid} #{inspect(metadata.args)}"
end
end
end
2 changes: 1 addition & 1 deletion test/support/faktory_test_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ defmodule FaktoryWorker.FaktoryTestHelpers do
Process.sleep(50)
{:ok, connection} = FaktoryWorker.Connection.open()

{:ok, info} = FaktoryWorker.Connection.send_command(connection, {:batch_status, bid})
{:ok, _info} = FaktoryWorker.Connection.send_command(connection, {:batch_status, bid})
end
end

0 comments on commit f7c68f4

Please sign in to comment.