diff --git a/.gitignore b/.gitignore index e51d63a..0de5bce 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ erl_crash.dump *.ez /logs + +# asdf version file +.tool-versions diff --git a/README.md b/README.md index f8784de..ba69616 100644 --- a/README.md +++ b/README.md @@ -416,6 +416,29 @@ All workers will restart automatically once the new connection is established. TaskBunny aims to provide zero hassle and recover automatically regardless how long the host takes to come back and accessible. +#### Failure backends + +By default, when the error occurs during the job execution TaskBunny reports it +to Logger. If you want to report the error to different services, you can configure +your custom failure backend. + +```elixir +config :task_bunny, failure_backend: [YourApp.CustomFailureBackend] +``` + +You can also report the errors to the multiple backends. For example, if you +want to use our default Logger backend with your custom backend you can +configure like below: + +```elixir +config :task_bunny, failure_backend: [ + TaskBunny.FailureBackend.Logger, + YourApp.CustomFailureBackend +] +``` + +Check out the implementation of [TaskBunny.FailureBackend.Logger](https://github.com/shinyscorpion/task_bunny/blob/master/lib/task_bunny/failure_backend/logger.ex) to learn how to write your custom failure backend. + ## Monitoring #### RabbitMQ plugins diff --git a/lib/task_bunny/config.ex b/lib/task_bunny/config.ex index fefec61..8a698f8 100644 --- a/lib/task_bunny/config.ex +++ b/lib/task_bunny/config.ex @@ -185,4 +185,18 @@ defmodule TaskBunny.Config do _ -> [] end end + + @doc """ + Returns the list of failure backends. + + It returns `TaskBunny.FailureBackend.Logger` by default. + """ + @spec failure_backend :: [atom] + def failure_backend do + case Application.fetch_env(:task_bunny, :failure_backend) do + {:ok, list} when is_list(list) -> list + {:ok, atom} when is_atom(atom) -> [atom] + _ -> [TaskBunny.FailureBackend.Logger] + end + end end diff --git a/lib/task_bunny/failure_backend.ex b/lib/task_bunny/failure_backend.ex new file mode 100644 index 0000000..07f8248 --- /dev/null +++ b/lib/task_bunny/failure_backend.ex @@ -0,0 +1,53 @@ +defmodule TaskBunny.FailureBackend do + @moduledoc """ + A behaviour module to implement the your own failure backend. + + Note the backend is called only for the errors caught during job processing. + Any other errors won't be reported to the backend. + + ## Configuration + + By default, TaskBunny reports the job failures to Logger. + If you want to report the error to different services, you can configure + your custom failure backend. + + config :task_bunny, failure_backend: [YourApp.CustomFailureBackend] + + You can also report the errors to the multiple backends. For example, if you + want to use our default Logger backend with your custom backend you can + configure like below: + + config :task_bunny, failure_backend: [ + TaskBunny.FailureBackend.Logger, + YourApp.CustomFailureBackend + ] + + ## Example + + See the implmentation of `TaskBunny.FailureBackend.Logger`. + + ## Argument + + See `TaskBunny.JobError` for the details. + + """ + alias TaskBunny.{JobError, Config, FailureBackend} + + @doc """ + Callback to report a job error. + """ + @callback report_job_error(JobError.t) :: any + + defmacro __using__(_options \\ []) do + quote do + @behaviour FailureBackend + end + end + + @doc false + @spec report_job_error(JobError.t) :: :ok + def report_job_error(job_error = %JobError{}) do + Config.failure_backend() + |> Enum.each(&(&1.report_job_error(job_error))) + end +end diff --git a/lib/task_bunny/failure_backend/logger.ex b/lib/task_bunny/failure_backend/logger.ex new file mode 100644 index 0000000..88c438e --- /dev/null +++ b/lib/task_bunny/failure_backend/logger.ex @@ -0,0 +1,94 @@ +defmodule TaskBunny.FailureBackend.Logger do + @moduledoc """ + Default failure backend that reports job errors to Logger. + """ + use TaskBunny.FailureBackend + require Logger + alias TaskBunny.JobError + + def report_job_error(error = %JobError{error_type: :exception}) do + """ + TaskBunny - #{error.job} failed for an exception. + + Exception: + #{my_inspect error.exception} + + #{common_message error} + + Stacktrace: + #{Exception.format_stacktrace(error.stacktrace)} + """ + |> do_report(error.reject) + end + + def report_job_error(error = %JobError{error_type: :return_value}) do + """ + TaskBunny - #{error.job} failed for an invalid return value. + + Return value: + #{my_inspect error.return_value} + + #{common_message error} + """ + |> do_report(error.reject) + end + + def report_job_error(error = %JobError{error_type: :exit}) do + """ + TaskBunny - #{error.job} failed for EXIT signal. + + Reason: + #{my_inspect error.reason} + + #{common_message error} + """ + |> do_report(error.reject) + end + + def report_job_error(error = %JobError{error_type: :timeout}) do + """ + TaskBunny - #{error.job} failed for timeout. + + #{common_message error} + """ + |> do_report(error.reject) + end + + def report_job_error(error) do + """ + TaskBunny - Failed with the unknown error type. + + Error dump: + #{my_inspect error} + """ + |> do_report(true) + end + + defp do_report(message, rejected) do + if rejected do + Logger.error message + else + Logger.warn message + end + end + + defp common_message(error) do + """ + Payload: + #{my_inspect error.payload} + + History: + - Failed count: #{error.failed_count} + - Reject: #{error.reject} + + Worker: + - Queue: #{error.queue} + - Concurrency: #{error.concurrency} + - PID: #{inspect error.pid} + """ + end + + defp my_inspect(arg) do + inspect arg, pretty: true, width: 100 + end +end diff --git a/lib/task_bunny/job_error.ex b/lib/task_bunny/job_error.ex new file mode 100644 index 0000000..50a0570 --- /dev/null +++ b/lib/task_bunny/job_error.ex @@ -0,0 +1,101 @@ +defmodule TaskBunny.JobError do + @moduledoc """ + A struct that holds an error information occured during the job processing. + + ## Attributes + + - job: the job module failed + - payload: the payload(arguments) for the job execution + - error_type: the type of the error. :exception, :return_value, :timeout or :exit + - exception: the inner exception (option) + - stacktrace: the stacktrace (only available for the exception) + - return_value: the return value from the job (only available for the return value error) + - reason: the reason information passed with EXIT signal (only available for exit error) + - raw_body: the raw body for the message + - meta: the meta data given by RabbitMQ + - failed_count: the number of failures for the job processing request + - queue: the name of the queue + - concurrency: the number of concurrent job processing of the worker + - pid: the process ID of the worker + - reject: sets true if the job is rejected for the failure (means it won't be retried again) + + """ + + @type t :: %__MODULE__{ + job: atom | nil, + payload: any, + error_type: :exception | :return_value | :timeout | :exit | nil, + exception: struct | nil, + stacktrace: list(tuple) | nil, + return_value: any, + reason: any, + raw_body: String.t, + meta: map, + failed_count: integer, + queue: String.t, + concurrency: integer, + pid: pid | nil, + reject: boolean + } + + defstruct [ + job: nil, + payload: nil, + error_type: nil, + exception: nil, + stacktrace: nil, + return_value: nil, + reason: nil, + raw_body: "", + meta: %{}, + failed_count: 0, + queue: "", + concurrency: 1, + pid: nil, + reject: false + ] + + @doc false + @spec handle_exception(atom, any, struct) :: t + def handle_exception(job, payload, exception) do + %__MODULE__{ + job: job, + payload: payload, + error_type: :exception, + exception: exception, + stacktrace: System.stacktrace() + } + end + + @doc false + @spec handle_exit(atom, any, any) :: t + def handle_exit(job, payload, reason) do + %__MODULE__{ + job: job, + payload: payload, + error_type: :exit, + reason: reason + } + end + + @doc false + @spec handle_return_value(atom, any, any) :: t + def handle_return_value(job, payload, return_value) do + %__MODULE__{ + job: job, + payload: payload, + error_type: :return_value, + return_value: return_value + } + end + + @doc false + @spec handle_timeout(atom, any) :: t + def handle_timeout(job, payload) do + %__MODULE__{ + job: job, + payload: payload, + error_type: :timeout + } + end +end diff --git a/lib/task_bunny/job_runner.ex b/lib/task_bunny/job_runner.ex index acc73e0..22e4eaf 100644 --- a/lib/task_bunny/job_runner.ex +++ b/lib/task_bunny/job_runner.ex @@ -22,6 +22,7 @@ defmodule TaskBunny.JobRunner do @moduledoc false require Logger + alias TaskBunny.JobError @doc ~S""" Invokes the given job with the given payload. @@ -33,10 +34,10 @@ defmodule TaskBunny.JobRunner do def invoke(job, payload, message) do caller = self() - time_out_error = {:error, "#{inspect job} timed out with #{job.timeout}"} + timeout_error = {:error, JobError.handle_timeout(job, payload)} timer = Process.send_after( caller, - {:job_finished, time_out_error, message}, + {:job_finished, timeout_error, message}, job.timeout ) @@ -52,14 +53,18 @@ defmodule TaskBunny.JobRunner do # Any raises or throws in the perform are caught and turned into an :error tuple. @spec run_job(atom, any) :: :ok | {:ok, any} | {:error, any} defp run_job(job, payload) do - job.perform(payload) + case job.perform(payload) do + :ok -> :ok + {:ok, something} -> {:ok, something} + error -> {:error, JobError.handle_return_value(job, payload, error)} + end rescue error -> - Logger.error "TaskBunny.JobRunner - Runner rescued #{inspect error}" - {:error, error} + Logger.debug "TaskBunny.JobRunner - Runner rescued #{inspect error}" + {:error, JobError.handle_exception(job, payload, error)} catch _, reason -> - Logger.error "TaskBunny.JobRunner - Runner caught reason: #{inspect reason}" - {:error, reason} + Logger.debug "TaskBunny.JobRunner - Runner caught reason: #{inspect reason}" + {:error, JobError.handle_exit(job, payload, reason)} end end diff --git a/lib/task_bunny/worker.ex b/lib/task_bunny/worker.ex index f794307..a5ca8ae 100644 --- a/lib/task_bunny/worker.ex +++ b/lib/task_bunny/worker.ex @@ -10,7 +10,7 @@ defmodule TaskBunny.Worker do use GenServer require Logger alias TaskBunny.{Connection, Consumer, JobRunner, Queue, - Publisher, Worker, Message} + Publisher, Worker, Message, FailureBackend} @typedoc """ Struct that represents a state of the worker GenServer. @@ -211,26 +211,30 @@ defmodule TaskBunny.Worker do defp succeeded?({:ok, _}), do: true defp succeeded?(_), do: false - defp handle_failed_job(state, body, meta, result) do + defp handle_failed_job(state, body, meta, {:error, job_error}) do {:ok, decoded} = Message.decode(body) failed_count = Message.failed_count(decoded) + 1 job = decoded["job"] - new_body = Message.add_error_log(body, result) - - case failed_count <= job.max_retry() do - true -> - Logger.warn log_msg("job failed #{failed_count} times.", state, [body: body, will_be_retried: true]) - - retry_message(job, state, new_body, meta, failed_count) - - {:noreply, update_job_stats(state, :failed)} - false -> - # Failed more than X times - Logger.error log_msg("job failed #{failed_count} times.", state, [body: body, will_be_retried: false]) - - reject_message(state, new_body, meta) - - {:noreply, update_job_stats(state, :rejected)} + new_body = Message.add_error_log(body, job_error) + + job_error + |> Map.merge(%{ + raw_body: body, + meta: meta, + failed_count: failed_count, + queue: state.queue, + concurrency: state.concurrency, + pid: self(), + reject: failed_count > job.max_retry() + }) + |> FailureBackend.report_job_error() + + if failed_count <= job.max_retry() do + retry_message(job, state, new_body, meta, failed_count) + {:noreply, update_job_stats(state, :failed)} + else + reject_message(state, new_body, meta) + {:noreply, update_job_stats(state, :rejected)} end end diff --git a/test/task_bunny/failure_backend/logger_test.exs b/test/task_bunny/failure_backend/logger_test.exs new file mode 100644 index 0000000..940767e --- /dev/null +++ b/test/task_bunny/failure_backend/logger_test.exs @@ -0,0 +1,62 @@ +defmodule TaskBunny.FailureBackend.LoggerTest do + use ExUnit.Case, async: false + alias TaskBunny.JobError + import TaskBunny.FailureBackend.Logger + import ExUnit.CaptureLog + + @job_error %JobError{ + job: TestJob, payload: %{"test" => 1}, reject: false, + failed_count: 1, queue: "test_queue", pid: self(), + } + + @exception_error Map.merge(@job_error, %{ + error_type: :exception, exception: RuntimeError.exception("Hello"), + stacktrace: System.stacktrace() + }) + + @return_value_error Map.merge(@job_error, %{ + error_type: :return_value, return_value: {:error, :testing} + }) + + @exit_error Map.merge(@job_error, %{ + error_type: :exit, reason: :just_testing + }) + + @timeout_error Map.merge(@job_error, %{ + error_type: :timeout + }) + + @unexpected_error "This should not be passed" + + describe "report_job_error/1" do + test "handles an exception" do + assert capture_log(fn -> + report_job_error @exception_error + end) =~ "TaskBunny - Elixir.TestJob failed for an exception" + end + + test "handles an invalid return value" do + assert capture_log(fn -> + report_job_error @return_value_error + end) =~ "TaskBunny - Elixir.TestJob failed for an invalid return value" + end + + test "handles the EXIT signal" do + assert capture_log(fn -> + report_job_error @exit_error + end) =~ "TaskBunny - Elixir.TestJob failed for EXIT signal" + end + + test "handles timeout" do + assert capture_log(fn -> + report_job_error @timeout_error + end) =~ "TaskBunny - Elixir.TestJob failed for timeout" + end + + test "handles non JobError just in case" do + assert capture_log(fn -> + report_job_error @unexpected_error + end) =~ "TaskBunny - Failed with the unknown error type" + end + end +end diff --git a/test/task_bunny/failure_backend_test.exs b/test/task_bunny/failure_backend_test.exs new file mode 100644 index 0000000..f04439f --- /dev/null +++ b/test/task_bunny/failure_backend_test.exs @@ -0,0 +1,48 @@ +defmodule TaskBunny.FailureBackendTest do + use ExUnit.Case, async: false + alias TaskBunny.{JobError, FailureBackend} + import ExUnit.{CaptureLog, CaptureIO} + + defp setup_failure_backend_config(failure_backend) do + :meck.new Application, [:passthrough] + :meck.expect Application, :fetch_env, fn(:task_bunny, :failure_backend) -> + {:ok, failure_backend} + end + + on_exit fn -> :meck.unload end + end + + defmodule TestBackend do + use FailureBackend + + def report_job_error(error) do + IO.puts "Hello #{error.job}" + end + end + + @job_error %JobError{ + job: TestJob, payload: %{"test" => 1}, reject: false, + failed_count: 1, queue: "test_queue", pid: self(), + } + + @exception_error Map.merge(@job_error, %{ + error_type: :exception, exception: RuntimeError.exception("Hello"), + stacktrace: System.stacktrace() + }) + + describe "report_job_error/1" do + test "reports to Logger backend by default" do + assert capture_log(fn -> + FailureBackend.report_job_error @exception_error + end) =~ "TaskBunny - Elixir.TestJob failed for an exception" + end + + test "reports to the custom backend" do + setup_failure_backend_config([TestBackend]) + + assert capture_io(fn -> + FailureBackend.report_job_error @exception_error + end) =~ "Hello Elixir.TestJob" + end + end +end diff --git a/test/task_bunny/job_runner_test.exs b/test/task_bunny/job_runner_test.exs index 57834f5..c71e00c 100644 --- a/test/task_bunny/job_runner_test.exs +++ b/test/task_bunny/job_runner_test.exs @@ -71,7 +71,7 @@ defmodule TaskBunny.JobRunnerTest do test "handles job error" do JobRunner.invoke(SampleJobs.ErrorJob, nil, nil) - assert_receive {:job_finished, {:error, "failed!"}, nil} + assert_receive {:job_finished, {:error, %{return_value: {:error, "failed!"}}}, nil} end test "handles job crashing" do