Skip to content

Commit

Permalink
fix: revert "fix: undefined.handle_message errors (supabase#111)"
Browse files Browse the repository at this point in the history
This reverts commit 591c925.
  • Loading branch information
w3b6x9 authored and kiwicopple committed Jun 9, 2021
1 parent d2d6d16 commit 782ab00
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 92 deletions.
53 changes: 0 additions & 53 deletions server/lib/adapters/adapter_conn_retry.ex

This file was deleted.

1 change: 0 additions & 1 deletion server/lib/adapters/postgres/epgsql_implementation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ defmodule Realtime.Adapters.Postgres.EpgsqlImplementation do
name when is_binary(name) ->
# Simple query for replication mode so no prepared statements are supported
escaped_name = String.downcase(String.replace(name, "'", "\\'"))

query =
"SELECT COUNT(*) >= 1 FROM pg_replication_slots WHERE slot_name = '#{escaped_name}'"

Expand Down
5 changes: 1 addition & 4 deletions server/lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ defmodule Realtime.Application do
# Start the endpoint when the application starts
RealtimeWeb.Endpoint,
{
Realtime.Adapters.ConnRetry
},
{
Realtime.ReplicationSupervisor,
Realtime.Replication,
# You can provide a different WAL position if desired, or default to
# allowing Postgres to send you what it thinks you need
epgsql: epgsql_params,
Expand Down
17 changes: 4 additions & 13 deletions server/lib/realtime/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ defmodule Realtime.Replication do
defstruct(
config: [],
connection: nil,
conn_retry_delays: [],
subscribers: [],
transaction: nil,
relations: %{},
types: %{},
should_reset_retry: true
types: %{}
)
)

Expand Down Expand Up @@ -49,17 +49,12 @@ defmodule Realtime.Replication do
end

@impl true
def handle_info(
{:epgsql, _pid, {:x_log_data, _start_lsn, _end_lsn, binary_msg}},
%State{should_reset_retry: should_reset_retry} = state
) do
reset_retry_delays(should_reset_retry)
def handle_info({:epgsql, _pid, {:x_log_data, _start_lsn, _end_lsn, binary_msg}}, state) do
decoded = Realtime.Decoder.decode_message(binary_msg)
Logger.debug("Received binary message: #{inspect(binary_msg, limit: :infinity)}")
Logger.debug("Decoded message: " <> inspect(decoded, limit: :infinity))

{:noreply, process_message(decoded, state)}

end

@impl true
Expand All @@ -86,11 +81,7 @@ defmodule Realtime.Replication do
# Feel free to delete after testing
Logger.debug("Final Update of Columns " <> inspect(state.relations, limit: :infinity))

notify_subscribers(%{
state
| transaction: {current_txn_lsn, %{txn | changes: Enum.reverse(changes)}}
})

notify_subscribers(%{state | transaction: {current_txn_lsn, %{txn | changes: Enum.reverse(changes)}}})
:ok = adapter_impl(state.config).acknowledge_lsn(state.connection, end_lsn)

%{state | transaction: nil}
Expand Down
19 changes: 0 additions & 19 deletions server/lib/realtime/replication_supervisor.ex

This file was deleted.

82 changes: 80 additions & 2 deletions server/test/realtime/replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,86 @@ defmodule Realtime.ReplicationTest do
},
subscribers: [],
transaction: nil,
types: %{},
should_reset_retry: false
types: %{}
}}
end

test "Realtime.Replication.handle_info/2 :: :EXIT when adapter conn successful" do
with_mock Realtime.Adapters.Postgres.EpgsqlImplementation,
init: fn _ ->
{:ok, "epgsqpl_pid"}
end do
state = %{@test_state | conn_retry_delays: [0, 1023, 1999]}

assert {:noreply,
%State{
config: @test_config,
connection: "epgsqpl_pid",
conn_retry_delays: [],
subscribers: [],
transaction: nil,
relations: %{},
types: %{}
}} = Realtime.Replication.handle_info({:EXIT, nil, nil}, state)
end
end

test "Realtime.Replication.handle_info/2 :: :EXIT when adapter conn fails" do
with_mock Realtime.Adapters.Postgres.EpgsqlImplementation,
init: fn _ ->
{:error, {:error, :econnrefused}}
end do
state = %{@test_state | conn_retry_delays: [0, 1023, 1999]}

assert {:noreply,
%State{
config: @test_config,
connection: nil,
conn_retry_delays: [1023, 1999],
subscribers: [],
transaction: nil,
relations: %{},
types: %{}
}} = Realtime.Replication.handle_info({:EXIT, nil, nil}, state)
end
end

test "Realtime.Replication.get_retry_delay/1 when conn_retry_delays is empty" do
state = %State{
conn_retry_delays: [],
config: [
conn_retry_initial_delay: 300,
conn_retry_maximum_delay: 180_000,
conn_retry_jitter: 0.2
]
}

{delay, %State{conn_retry_delays: delays}} = Realtime.Replication.get_retry_delay(state)

assert delay == 0
assert is_list(delays)
refute Enum.empty?(delays)
assert Enum.all?(delays, &(is_integer(&1) and &1 > 0))
end

test "Realtime.Replication.get_retry_delay/1 when conn_retry_delays is not empty" do
state = %State{
conn_retry_delays: [489, 1011, 1996, 4023]
}

{delay, %State{conn_retry_delays: delays}} = Realtime.Replication.get_retry_delay(state)

assert delay == 489
assert delays == [1011, 1996, 4023]
end

test "Realtime.Replication.reset_retry_delay/1" do
state = %State{
conn_retry_delays: [198, 403, 781]
}

%State{conn_retry_delays: delays} = Realtime.Replication.reset_retry_delay(state)

assert delays == []
end
end

0 comments on commit 782ab00

Please sign in to comment.