diff --git a/.changeset/lucky-boats-lie.md b/.changeset/lucky-boats-lie.md new file mode 100644 index 0000000000..2f8e730975 --- /dev/null +++ b/.changeset/lucky-boats-lie.md @@ -0,0 +1,5 @@ +--- +"@core/elixir-client": patch +--- + +Fix race condition in elixir client when multiple simultaneous clients are streaming the same shape diff --git a/packages/elixir-client/lib/electric/client/fetch/monitor.ex b/packages/elixir-client/lib/electric/client/fetch/monitor.ex index e9064c5399..450a8d2d2f 100644 --- a/packages/elixir-client/lib/electric/client/fetch/monitor.ex +++ b/packages/elixir-client/lib/electric/client/fetch/monitor.ex @@ -79,6 +79,7 @@ defmodule Electric.Client.Fetch.Monitor do @impl true def handle_continue({:start_request, request_id, request, client}, state) do {:ok, _pid} = Fetch.Request.start_link({request_id, request, client, self()}) + {:noreply, state} end @@ -127,7 +128,7 @@ defmodule Electric.Client.Fetch.Monitor do send(pid, {:response, ref, response}) end - {:stop, :normal, :ok, state} + {:stop, {:shutdown, :normal}, :ok, state} end @impl true @@ -157,6 +158,6 @@ defmodule Electric.Client.Fetch.Monitor do send(pid, {:response, ref, {:error, reason}}) end - {:stop, :normal, state} + {:stop, {:shutdown, :normal}, state} end end diff --git a/packages/elixir-client/lib/electric/client/fetch/pool.ex b/packages/elixir-client/lib/electric/client/fetch/pool.ex index 2e42f226da..d8f518f02e 100644 --- a/packages/elixir-client/lib/electric/client/fetch/pool.ex +++ b/packages/elixir-client/lib/electric/client/fetch/pool.ex @@ -18,8 +18,11 @@ defmodule Electric.Client.Fetch.Pool do def request(%Client{} = client, %Fetch.Request{} = request, opts) do request_id = request_id(client, request) - # register this pid before making the request to avoid race conditions for - # very fast responses + # The monitor process is unique to the request and launches the actual + # request as a linked process. + # + # This coalesces requests, so no matter how many simultaneous + # clients we have, we only ever make one request to the backend. {:ok, monitor_pid} = start_monitor(request_id, request, client) try do @@ -48,13 +51,12 @@ defmodule Electric.Client.Fetch.Pool do defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid} defp return_existing(error), do: error - defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{shape_handle: nil} = request) do - %{endpoint: endpoint, shape: shape_definition} = request - {fetch_impl, URI.to_string(endpoint), shape_definition} - end - defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{} = request) do - %{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request - {fetch_impl, URI.to_string(endpoint), shape_handle, Client.Offset.to_tuple(offset), live} + { + fetch_impl, + URI.to_string(request.endpoint), + request.headers, + Fetch.Request.params(request) + } end end diff --git a/packages/elixir-client/lib/electric/client/fetch/request.ex b/packages/elixir-client/lib/electric/client/fetch/request.ex index 353fce1db8..9941e57635 100644 --- a/packages/elixir-client/lib/electric/client/fetch/request.ex +++ b/packages/elixir-client/lib/electric/client/fetch/request.ex @@ -117,7 +117,7 @@ defmodule Electric.Client.Fetch.Request do |> Util.map_put_if("replica", to_string(replica), replica != :default) |> Util.map_put_if("handle", shape_handle, is_binary(shape_handle)) |> Util.map_put_if("live", "true", live?) - |> Util.map_put_if("cursor", cursor, !is_nil(cursor)) + |> Util.map_put_if("cursor", to_string(cursor), !is_nil(cursor)) |> Util.map_put_if("database_id", database_id, !is_nil(database_id)) end @@ -133,9 +133,7 @@ defmodule Electric.Client.Fetch.Request do @doc false def start_link({request_id, request, client, monitor_pid}) do - GenServer.start_link(__MODULE__, {request_id, request, client, monitor_pid}, - name: name(request_id) - ) + GenServer.start_link(__MODULE__, {request_id, request, client, monitor_pid}) end @impl true @@ -161,16 +159,21 @@ defmodule Electric.Client.Fetch.Request do authenticated_request = Client.authenticate_request(client, request) - case fetcher.fetch(authenticated_request, fetcher_opts) do - {:ok, %Fetch.Response{status: status} = response} when status in 200..299 -> - reply(response, state) + try do + case fetcher.fetch(authenticated_request, fetcher_opts) do + {:ok, %Fetch.Response{status: status} = response} when status in 200..299 -> + reply(response, state) - {:ok, %Fetch.Response{} = response} -> - # Turn HTTP errors into errors - reply({:error, response}, state) + {:ok, %Fetch.Response{} = response} -> + # Turn HTTP errors into errors + reply({:error, response}, state) + error -> + reply(error, state) + end + rescue error -> - reply(error, state) + reply({:error, error}, state) end {:stop, :normal, state}