diff --git a/.changeset/beige-mice-notice.md b/.changeset/beige-mice-notice.md new file mode 100644 index 0000000000..8698c71cb4 --- /dev/null +++ b/.changeset/beige-mice-notice.md @@ -0,0 +1,5 @@ +--- +"@core/elixir-client": patch +--- + +Fix race condition where response comes before listener has monitored itself. diff --git a/packages/elixir-client/lib/electric/client/fetch/monitor.ex b/packages/elixir-client/lib/electric/client/fetch/monitor.ex index 450a8d2d2f..fb8453f0aa 100644 --- a/packages/elixir-client/lib/electric/client/fetch/monitor.ex +++ b/packages/elixir-client/lib/electric/client/fetch/monitor.ex @@ -70,7 +70,8 @@ defmodule Electric.Client.Fetch.Monitor do state = %{ request_id: request_id, - subscribers: [] + subscribers: [], + response: nil } {:ok, state, {:continue, {:start_request, request_id, request, client}}} @@ -83,21 +84,16 @@ defmodule Electric.Client.Fetch.Monitor do {:noreply, state} end - @impl true - def handle_call({:register, listener_pid}, _from, state) do - ref = Process.monitor(listener_pid) - - Logger.debug( - fn -> "Registering listener pid #{inspect(listener_pid)}" end, - request_id: state.request_id - ) - - state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1]) + def handle_continue(:handle_response, %{subscribers: _, response: nil} = state) do + {:noreply, state} + end - {:reply, ref, state} + def handle_continue(:handle_response, %{subscribers: [], response: _} = state) do + Logger.debug("Got response with no subscribers - deferring until subscribers are present") + {:noreply, state} end - def handle_call({:reply, response}, _from, state) do + def handle_continue(:handle_response, %{subscribers: subscribers, response: response} = state) do case response do %{status: status} -> Logger.debug( @@ -124,11 +120,29 @@ defmodule Electric.Client.Fetch.Monitor do ) end - for {pid, ref} <- state.subscribers do + for {pid, ref} <- subscribers do send(pid, {:response, ref, response}) end - {:stop, {:shutdown, :normal}, :ok, state} + {:stop, {:shutdown, :normal}, state} + end + + @impl true + def handle_call({:register, listener_pid}, _from, state) do + ref = Process.monitor(listener_pid) + + Logger.debug( + fn -> "Registering listener pid #{inspect(listener_pid)}" end, + request_id: state.request_id + ) + + state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1]) + + {:reply, ref, state, {:continue, :handle_response}} + end + + def handle_call({:reply, response}, _from, state) do + {:reply, :ok, %{state | response: response}, {:continue, :handle_response}} end @impl true @@ -149,7 +163,7 @@ defmodule Electric.Client.Fetch.Monitor do {:noreply, state} end - def handle_info({:EXIT, pid, reason}, state) do + def handle_info({:EXIT, pid, reason}, %{response: nil} = state) do Logger.debug(fn -> "Request process #{inspect(pid)} exited with reason #{inspect(reason)} before issuing a reply. Using reason as an error and exiting." end) @@ -160,4 +174,8 @@ defmodule Electric.Client.Fetch.Monitor do {:stop, {:shutdown, :normal}, state} end + + def handle_info({:EXIT, _pid, _reason}, state) do + {:noreply, state} + end end diff --git a/packages/elixir-client/lib/electric/client/mock.ex b/packages/elixir-client/lib/electric/client/mock.ex index d0ff235ad0..bd203f12bb 100644 --- a/packages/elixir-client/lib/electric/client/mock.ex +++ b/packages/elixir-client/lib/electric/client/mock.ex @@ -43,7 +43,7 @@ defmodule Electric.Client.Mock do end def init(parent) do - {:ok, %{parent: parent, from: nil, request: nil, response: nil}} + {:ok, %{parent: parent, requests: [], responses: []}} end def request(pid, request) do @@ -58,22 +58,24 @@ defmodule Electric.Client.Mock do GenServer.call(pid, {:response, response}) end - def handle_call({:request, request}, from, %{response: nil} = state) do - {:noreply, %{state | from: from, request: request}} + def handle_call({:request, request}, from, %{responses: []} = state) do + {:noreply, %{state | requests: state.requests ++ [{from, request}]}} end - def handle_call({:request, request}, _from, %{from: from, response: %{} = response} = state) do + def handle_call({:request, request}, _from, %{responses: [{from, response} | rest]} = state) do GenServer.reply(from, {:ok, request}) - {:reply, {:ok, response}, %{state | from: nil, response: nil}} + + {:reply, {:ok, response}, %{state | responses: rest}} end - def handle_call({:response, response}, from, %{from: nil} = state) do - {:noreply, %{state | from: from, response: response}} + def handle_call({:response, response}, from, %{requests: []} = state) do + {:noreply, %{state | responses: state.responses ++ [{from, response}]}} end - def handle_call({:response, response}, _from, %{from: from} = state) when not is_nil(from) do + def handle_call({:response, response}, _from, %{requests: [{from, request} | rest]} = state) do GenServer.reply(from, {:ok, response}) - {:reply, {:ok, state.request}, %{state | from: nil, request: nil}} + + {:reply, {:ok, request}, %{state | requests: rest}} end end diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index b0b697ca5a..981691d391 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -1,6 +1,8 @@ defmodule Electric.ClientTest do use ExUnit.Case, async: true + @moduletag :capture_log + import Support.DbSetup import Support.ClientHelpers