Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix elixir-client race condition #2157

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lucky-boats-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix race condition in elixir client when multiple simultaneous clients are streaming the same shape
5 changes: 3 additions & 2 deletions packages/elixir-client/lib/electric/client/fetch/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ defmodule Electric.Client.Fetch.Monitor do
@impl true
def handle_continue({:start_request, request_id, request, client}, state) do
{:ok, _pid} = Fetch.Request.start_link({request_id, request, client, self()})

{:noreply, state}
end

Expand Down Expand Up @@ -127,7 +128,7 @@ defmodule Electric.Client.Fetch.Monitor do
send(pid, {:response, ref, response})
end

{:stop, :normal, :ok, state}
{:stop, {:shutdown, :normal}, :ok, state}
end

@impl true
Expand Down Expand Up @@ -157,6 +158,6 @@ defmodule Electric.Client.Fetch.Monitor do
send(pid, {:response, ref, {:error, reason}})
end

{:stop, :normal, state}
{:stop, {:shutdown, :normal}, state}
end
end
20 changes: 11 additions & 9 deletions packages/elixir-client/lib/electric/client/fetch/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ defmodule Electric.Client.Fetch.Pool do
def request(%Client{} = client, %Fetch.Request{} = request, opts) do
request_id = request_id(client, request)

# register this pid before making the request to avoid race conditions for
# very fast responses
# The monitor process is unique to the request and launches the actual
# request as a linked process.
#
# This coalesces requests, so no matter how many simultaneous
# clients we have, we only ever make one request to the backend.
{:ok, monitor_pid} = start_monitor(request_id, request, client)

try do
Expand Down Expand Up @@ -48,13 +51,12 @@ defmodule Electric.Client.Fetch.Pool do
defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid}
defp return_existing(error), do: error

defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{shape_handle: nil} = request) do
%{endpoint: endpoint, shape: shape_definition} = request
{fetch_impl, URI.to_string(endpoint), shape_definition}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{} = request) do
%{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request
{fetch_impl, URI.to_string(endpoint), shape_handle, Client.Offset.to_tuple(offset), live}
{
fetch_impl,
URI.to_string(request.endpoint),
request.headers,
Fetch.Request.params(request)
}
end
end
25 changes: 14 additions & 11 deletions packages/elixir-client/lib/electric/client/fetch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ defmodule Electric.Client.Fetch.Request do
|> Util.map_put_if("replica", to_string(replica), replica != :default)
|> Util.map_put_if("handle", shape_handle, is_binary(shape_handle))
|> Util.map_put_if("live", "true", live?)
|> Util.map_put_if("cursor", cursor, !is_nil(cursor))
|> Util.map_put_if("cursor", to_string(cursor), !is_nil(cursor))
|> Util.map_put_if("database_id", database_id, !is_nil(database_id))
end

Expand All @@ -133,9 +133,7 @@ defmodule Electric.Client.Fetch.Request do

@doc false
def start_link({request_id, request, client, monitor_pid}) do
GenServer.start_link(__MODULE__, {request_id, request, client, monitor_pid},
name: name(request_id)
)
GenServer.start_link(__MODULE__, {request_id, request, client, monitor_pid})
end

@impl true
Expand All @@ -161,16 +159,21 @@ defmodule Electric.Client.Fetch.Request do

authenticated_request = Client.authenticate_request(client, request)

case fetcher.fetch(authenticated_request, fetcher_opts) do
{:ok, %Fetch.Response{status: status} = response} when status in 200..299 ->
reply(response, state)
try do
case fetcher.fetch(authenticated_request, fetcher_opts) do
{:ok, %Fetch.Response{status: status} = response} when status in 200..299 ->
reply(response, state)

{:ok, %Fetch.Response{} = response} ->
# Turn HTTP errors into errors
reply({:error, response}, state)
{:ok, %Fetch.Response{} = response} ->
# Turn HTTP errors into errors
reply({:error, response}, state)

error ->
reply(error, state)
end
rescue
error ->
reply(error, state)
reply({:error, error}, state)
end

{:stop, :normal, state}
Expand Down
Loading