diff --git a/docs/logging.md b/docs/logging.md index 7934bd3..f2c2c5b 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -70,4 +70,14 @@ The available status outcomes are. The `metadata` supplied with this event is a map containing the following fields. - `wid` - A worker identifier that can be used to identify which worker emitted this event. -- `prev_status` - The `status` that was emitted on the previous `beat` event. This is useful for tracking when the status has changed between heartbeats. \ No newline at end of file +- `prev_status` - The `status` that was emitted on the previous `beat` event. This is useful for tracking when the status has changed between heartbeats. + +## Job Timeout Event + +Event Name: `[:faktory_worker, :job_timeout]`. + +The job timeout event is emitted 20 seconds prior to a job's reservation deadline. This 20-second window allows the job to be terminated before Faktory can expire and retry the job on the server. + +The `status` status provided for this event will always be `{:error, :job_timeout}`. + +The `metadata` supplied with this event is a map containing the job's ID, arguments, and type. diff --git a/lib/faktory_worker/telemetry.ex b/lib/faktory_worker/telemetry.ex index dcc6541..a35fd51 100644 --- a/lib/faktory_worker/telemetry.ex +++ b/lib/faktory_worker/telemetry.ex @@ -3,7 +3,7 @@ defmodule FaktoryWorker.Telemetry do require Logger - @events [:push, :beat, :fetch, :ack, :failed_ack, :batch_new, :batch_open, :batch_commit] + @events [:push, :beat, :fetch, :ack, :failed_ack, :job_timeout, :batch_new, :batch_open, :batch_commit] @doc false @spec attach_default_handler :: :ok | {:error, :already_exists} @@ -83,6 +83,12 @@ defmodule FaktoryWorker.Telemetry do log_error("Error sending 'FAIL' acknowledgement to faktory", job.jid, job.args, job.jobtype) end + # Misc events + + defp log_event(:job_timeout, _, job) do + log_error("Job has reached its reservation timeout and will be failed", job.jid, job.args, job.jobtype) + end + # Log formats defp log_info(message) do @@ -113,10 +119,6 @@ defmodule FaktoryWorker.Telemetry do Logger.error("[faktory-worker] #{message}") end - defp log_error(outcome, wid) do - log_error("#{outcome} wid-#{wid}") - end - defp log_error(outcome, jid, args, worker_module) do log_error("#{outcome} (#{worker_module}) jid-#{jid} #{inspect(args)}") end diff --git a/lib/faktory_worker/worker.ex b/lib/faktory_worker/worker.ex index 80747d1..1288f41 100644 --- a/lib/faktory_worker/worker.ex +++ b/lib/faktory_worker/worker.ex @@ -84,6 +84,12 @@ defmodule FaktoryWorker.Worker do @spec stop_job(state :: __MODULE__.t()) :: __MODULE__.t() def stop_job(%{job_ref: job_ref} = state) when job_ref != nil do + Telemetry.execute(:job_timeout, {:error, :job_timeout}, %{ + jid: state.job_id, + args: state.job["args"], + jobtype: state.job["jobtype"] + }) + state |> job_supervisor_name() |> Task.Supervisor.terminate_child(job_ref.pid) diff --git a/test/faktory_worker/telemetry_test.exs b/test/faktory_worker/telemetry_test.exs index d23d5f1..c9bfac8 100644 --- a/test/faktory_worker/telemetry_test.exs +++ b/test/faktory_worker/telemetry_test.exs @@ -240,5 +240,23 @@ defmodule FaktoryWorker.TelemetryTest do metadata.jobtype }) jid-#{metadata.jid} #{inspect(metadata.args)}" end + + test "should log job timeouts" do + metadata = %{ + jid: Random.job_id(), + args: %{hey: "there!"}, + jobtype: "TestQueueWorker" + } + + log_message = + capture_log(fn -> + Telemetry.handle_event([:faktory_worker, :job_timeout], nil, metadata, []) + end) + + assert log_message =~ + "[faktory-worker] Job has reached its reservation timeout and will be failed (#{ + metadata.jobtype + }) jid-#{metadata.jid} #{inspect(metadata.args)}" + end end end diff --git a/test/support/faktory_test_helpers.ex b/test/support/faktory_test_helpers.ex index d5e8792..37f7e79 100644 --- a/test/support/faktory_test_helpers.ex +++ b/test/support/faktory_test_helpers.ex @@ -43,6 +43,6 @@ defmodule FaktoryWorker.FaktoryTestHelpers do Process.sleep(50) {:ok, connection} = FaktoryWorker.Connection.open() - {:ok, info} = FaktoryWorker.Connection.send_command(connection, {:batch_status, bid}) + {:ok, _info} = FaktoryWorker.Connection.send_command(connection, {:batch_status, bid}) end end