Skip to content

Commit

Permalink
Correct reliability issues in courier services
Browse files Browse the repository at this point in the history
  • Loading branch information
whitfin committed Dec 24, 2023
1 parent 2a762ef commit 2e88c0c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 31 deletions.
82 changes: 51 additions & 31 deletions lib/cachex/services/courier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ defmodule Cachex.Services.Courier do
#
# This will create a Tuple to store the cache record as well
# as the Map used to track the internal task referencing.
def init(cache),
do: {:ok, {cache, %{}}}
def init(cache) do
Process.flag(:trap_exit, true)
{:ok, {cache, %{}}}
end

@doc false
# Dispatches a tasks to be carried out by the Courier.
Expand All @@ -67,39 +69,40 @@ defmodule Cachex.Services.Courier do
# results from the task after completion (regardless of outcome).
def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks}) do
references =
case Map.get(tasks, key, []) do
[] ->
case Map.get(tasks, key) do
{pid, listeners} ->
{pid, [caller | listeners]}

nil ->
parent = self()

spawn(fn ->
result =
try do
task.()
rescue
e ->
{
:error,
%ExecutionError{
message: Exception.message(e),
stack: stack_compat() ++ stack
worker =
spawn_link(fn ->
result =
try do
task.()
rescue
e ->
{
:error,
%ExecutionError{
message: Exception.message(e),
stack: stack_compat() ++ stack
}
}
}
end
end

formatted = Actions.format_fetch_value(result)
normalized = Actions.normalize_commit(formatted)
formatted = Actions.format_fetch_value(result)
normalized = Actions.normalize_commit(formatted)

with {:commit, val, options} <- normalized do
Put.execute(cache, key, val, [const(:notify_false) | options])
end

send(parent, {:notify, key, formatted})
end)
with {:commit, val, options} <- normalized do
Put.execute(cache, key, val, [const(:notify_false) | options])
end

[caller]
send(parent, {:notify, key, formatted})
end)

li ->
[caller | li]
{worker, [caller]}
end

{:noreply, {cache, Map.put(tasks, key, references)}}
Expand All @@ -117,10 +120,11 @@ defmodule Cachex.Services.Courier do
def handle_info({:notify, key, result}, {cache, tasks}) do
callers =
tasks
|> Map.get(key, [])
|> Map.get(key, {nil, []})
|> elem(1)
|> Enum.reverse()

with [owner | listeners] <- callers do
with [owner | children] <- callers do
GenServer.reply(owner, result)

result =
Expand All @@ -135,14 +139,30 @@ defmodule Cachex.Services.Courier do
value
end

for caller <- listeners do
for caller <- children do
GenServer.reply(caller, result)
end
end

{:noreply, {cache, Map.delete(tasks, key)}}
end

@doc false
# Traps exits from spawned worker processes to sync fail cases.
#
# This is necessary to avoid calls to the Courier hanging if a
# worker process fails unexpectedly. An error will be generated
# and passed through to the default notify hook to ensure cleanup.
def handle_info({:EXIT, pid, reason}, {_cache, tasks} = state) do
case Enum.find(tasks, &match?({_, {^pid, _}}, &1)) do
{key, _} ->
handle_info({:notify, key, {:error, reason}}, state)

nil ->
{:noreply, state}
end
end

###############
# Private API #
###############
Expand Down
24 changes: 24 additions & 0 deletions test/cachex/services/courier_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,28 @@ defmodule Cachex.Services.CourierTest do
assert match?({:error, %Cachex.ExecutionError{}}, result)
assert elem(result, 1).message == "argument error"
end

test "recovering from failed tasks" do
# start a new cache
cache = Helper.create_cache()
cache = Services.Overseer.retrieve(cache)

# kill in flight task
parent =
spawn(fn ->
receive do
pid -> Process.exit(pid, :kill)
end
end)

# dispatch a long running task
result =
Services.Courier.dispatch(cache, "my_key", fn ->
send(parent, self())
:timer.sleep(60000)
end)

# check we caught the killed task
assert result == {:error, :killed}
end
end

0 comments on commit 2e88c0c

Please sign in to comment.