Skip to content

Commit

Permalink
Fix monitoring registration race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
msfstef committed Dec 16, 2024
1 parent f25a1ec commit 80fb13d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-mice-notice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix race condition where response comes before listener has monitored itself.
50 changes: 34 additions & 16 deletions packages/elixir-client/lib/electric/client/fetch/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ defmodule Electric.Client.Fetch.Monitor do

state = %{
request_id: request_id,
subscribers: []
subscribers: [],
response: nil
}

{:ok, state, {:continue, {:start_request, request_id, request, client}}}
Expand All @@ -83,21 +84,16 @@ defmodule Electric.Client.Fetch.Monitor do
{:noreply, state}
end

@impl true
def handle_call({:register, listener_pid}, _from, state) do
ref = Process.monitor(listener_pid)

Logger.debug(
fn -> "Registering listener pid #{inspect(listener_pid)}" end,
request_id: state.request_id
)

state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1])
def handle_continue(:handle_response, %{subscribers: _, response: nil} = state) do
{:noreply, state}
end

{:reply, ref, state}
def handle_continue(:handle_response, %{subscribers: [], response: _} = state) do
Logger.debug("Got response with no subscribers - deferring until subscribers are present")
{:noreply, state}
end

def handle_call({:reply, response}, _from, state) do
def handle_continue(:handle_response, %{subscribers: subscribers, response: response} = state) do
case response do
%{status: status} ->
Logger.debug(
Expand All @@ -124,11 +120,29 @@ defmodule Electric.Client.Fetch.Monitor do
)
end

for {pid, ref} <- state.subscribers do
for {pid, ref} <- subscribers do
send(pid, {:response, ref, response})
end

{:stop, {:shutdown, :normal}, :ok, state}
{:stop, {:shutdown, :normal}, state}
end

@impl true
def handle_call({:register, listener_pid}, _from, state) do
ref = Process.monitor(listener_pid)

Logger.debug(
fn -> "Registering listener pid #{inspect(listener_pid)}" end,
request_id: state.request_id
)

state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1])

{:reply, ref, state, {:continue, :handle_response}}
end

def handle_call({:reply, response}, _from, state) do
{:reply, :ok, %{state | response: response}, {:continue, :handle_response}}
end

@impl true
Expand All @@ -149,7 +163,7 @@ defmodule Electric.Client.Fetch.Monitor do
{:noreply, state}
end

def handle_info({:EXIT, pid, reason}, state) do
def handle_info({:EXIT, pid, reason}, %{response: nil} = state) do
Logger.debug(fn ->
"Request process #{inspect(pid)} exited with reason #{inspect(reason)} before issuing a reply. Using reason as an error and exiting."
end)
Expand All @@ -160,4 +174,8 @@ defmodule Electric.Client.Fetch.Monitor do

{:stop, {:shutdown, :normal}, state}
end

def handle_info({:EXIT, _pid, _reason}, state) do
{:noreply, state}
end
end
20 changes: 11 additions & 9 deletions packages/elixir-client/lib/electric/client/mock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Electric.Client.Mock do
end

def init(parent) do
{:ok, %{parent: parent, from: nil, request: nil, response: nil}}
{:ok, %{parent: parent, requests: [], responses: []}}
end

def request(pid, request) do
Expand All @@ -58,22 +58,24 @@ defmodule Electric.Client.Mock do
GenServer.call(pid, {:response, response})
end

def handle_call({:request, request}, from, %{response: nil} = state) do
{:noreply, %{state | from: from, request: request}}
def handle_call({:request, request}, from, %{responses: []} = state) do
{:noreply, %{state | requests: state.requests ++ [{from, request}]}}
end

def handle_call({:request, request}, _from, %{from: from, response: %{} = response} = state) do
def handle_call({:request, request}, _from, %{responses: [{from, response} | rest]} = state) do
GenServer.reply(from, {:ok, request})
{:reply, {:ok, response}, %{state | from: nil, response: nil}}

{:reply, {:ok, response}, %{state | responses: rest}}
end

def handle_call({:response, response}, from, %{from: nil} = state) do
{:noreply, %{state | from: from, response: response}}
def handle_call({:response, response}, from, %{requests: []} = state) do
{:noreply, %{state | responses: state.responses ++ [{from, response}]}}
end

def handle_call({:response, response}, _from, %{from: from} = state) when not is_nil(from) do
def handle_call({:response, response}, _from, %{requests: [{from, request} | rest]} = state) do
GenServer.reply(from, {:ok, response})
{:reply, {:ok, state.request}, %{state | from: nil, request: nil}}

{:reply, {:ok, request}, %{state | requests: rest}}
end
end

Expand Down
2 changes: 2 additions & 0 deletions packages/elixir-client/test/electric/client_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Electric.ClientTest do
use ExUnit.Case, async: true

@moduletag :capture_log

import Support.DbSetup
import Support.ClientHelpers

Expand Down

0 comments on commit 80fb13d

Please sign in to comment.