Skip to content

Commit

Permalink
elixir-client: ensure monitor + request is atomic (#2151)
Browse files Browse the repository at this point in the history
It's possible for a process to start a request monitor process but then
exit before the actual http request process is started, leaving the
monitor process stuck (but still registered under the request id).

In this case if a later process tries to make the same request it will
find the stuck monitor and register itself with it but no response will
arrive (since no request is running).

This moves the making of the request into the monitor process so it will
always definitely happen.

I've also added extra process monitoring so that if the monitor crashes
the caller will know about it.
  • Loading branch information
magnetised authored Dec 11, 2024
1 parent 71b8ab2 commit fc1796a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .changeset/green-drinks-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix stalled elixir client streams by ensuring that requests are always made, even if calling process dies
46 changes: 36 additions & 10 deletions packages/elixir-client/lib/electric/client/fetch/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,52 @@ defmodule Electric.Client.Fetch.Monitor do

use GenServer

alias Electric.Client.Fetch

require Logger

def name(request_id) do
{:via, Registry, {Electric.Client.Registry, {__MODULE__, request_id}}}
end

def child_spec(request_id) do
def child_spec({request_id, _request, _client} = args) do
%{
id: {__MODULE__, request_id},
start: {__MODULE__, :start_link, [request_id]},
restart: :transient,
start: {__MODULE__, :start_link, [args]},
# don't restart on error because it would lose the subscriber list
# we instead want the requesting processes to know about the failure
restart: :temporary,
type: :worker
}
end

def start_link(request_id) do
GenServer.start_link(__MODULE__, request_id, name: name(request_id))
def start_link({request_id, request, client}) do
GenServer.start_link(__MODULE__, {request_id, request, client}, name: name(request_id))
end

def register(monitor_pid, listener_pid) do
GenServer.call(monitor_pid, {:register, listener_pid})
# Register the calling pid with the monitor and the monitor with the
# calling pid.
#
# If the calling pid goes away, then the monitor can remove it from its
# subscribers list.
#
# If the monitor pid goes away before it's returned a response, then we
# raise because it shouldn't happen
caller_monitor_ref = Process.monitor(monitor_pid)
monitor_caller_ref = GenServer.call(monitor_pid, {:register, listener_pid})
{caller_monitor_ref, monitor_caller_ref}
end

def wait(ref) do
def wait({caller_monitor_ref, monitor_caller_ref}) do
receive do
{:response, ^ref, response} -> response
{:response, ^monitor_caller_ref, response} ->
Process.demonitor(caller_monitor_ref, [:flush])
response

{:DOWN, ^caller_monitor_ref, :process, _pid, reason} ->
raise Electric.Client.Error,
message: "#{Fetch.Monitor} process died with reason #{inspect(reason)}"
end
end

Expand All @@ -45,15 +65,21 @@ defmodule Electric.Client.Fetch.Monitor do
end

@impl true
def init(request_id) do
def init({request_id, request, client}) do
Process.flag(:trap_exit, true)

state = %{
request_id: request_id,
subscribers: []
}

{:ok, state}
{:ok, state, {:continue, {:start_request, request_id, request, client}}}
end

@impl true
def handle_continue({:start_request, request_id, request, client}, state) do
{:ok, _pid} = Fetch.Request.start_link({request_id, request, client, self()})
{:noreply, state}
end

@impl true
Expand Down
16 changes: 3 additions & 13 deletions packages/elixir-client/lib/electric/client/fetch/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ defmodule Electric.Client.Fetch.Pool do

# register this pid before making the request to avoid race conditions for
# very fast responses
{:ok, monitor_pid} = start_monitor(request_id)
{:ok, monitor_pid} = start_monitor(request_id, request, client)

try do
ref = Fetch.Monitor.register(monitor_pid, self())

{:ok, _request_pid} = start_request(request_id, request, client, monitor_pid)

Fetch.Monitor.wait(ref)
catch
:exit, {reason, _} ->
Expand All @@ -38,18 +36,10 @@ defmodule Electric.Client.Fetch.Pool do
end
end

defp start_request(request_id, request, client, monitor_pid) do
DynamicSupervisor.start_child(
Electric.Client.RequestSupervisor,
{Fetch.Request, {request_id, request, client, monitor_pid}}
)
|> return_existing()
end

defp start_monitor(request_id) do
defp start_monitor(request_id, request, client) do
DynamicSupervisor.start_child(
Electric.Client.RequestSupervisor,
{Electric.Client.Fetch.Monitor, request_id}
{Electric.Client.Fetch.Monitor, {request_id, request, client}}
)
|> return_existing()
end
Expand Down
4 changes: 1 addition & 3 deletions packages/elixir-client/lib/electric/client/fetch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ defmodule Electric.Client.Fetch.Request do
%{
id: {__MODULE__, request_id},
start: {__MODULE__, :start_link, [args]},
restart: :transient,
restart: :temporary,
type: :worker
}
end
Expand All @@ -144,8 +144,6 @@ defmodule Electric.Client.Fetch.Request do
"Starting request for #{inspect(request_id)}"
end)

Process.link(monitor_pid)

state = %{
request_id: request_id,
request: request,
Expand Down

0 comments on commit fc1796a

Please sign in to comment.