Skip to content

Commit

Permalink
fix: revert "feat: Add database connection retries (#82)"
Browse files Browse the repository at this point in the history
This reverts commit 2a954b0.
  • Loading branch information
w3b6x9 committed Feb 10, 2021
1 parent 894f4bb commit 6185462
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 131 deletions.
10 changes: 0 additions & 10 deletions server/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ db_ssl = System.get_env("DB_SSL", "true") === "true"
slot_name = System.get_env("SLOT_NAME") || :temporary
configuration_file = System.get_env("CONFIGURATION_FILE") || nil

# Initial delay defaults to half a second
db_retry_initial_delay = System.get_env("DB_RETRY_INITIAL_DELAY", "500")
# Maximum delay defaults to five minutes
db_retry_maximum_delay = System.get_env("DB_RETRY_MAXIMUM_DELAY", "300000")
# Jitter will randomly adjust each delay within 10% of its value
db_retry_jitter = System.get_env("DB_RETRY_JITTER", "10")

# Channels are not secured by default in development and
# are secured by default in production.
secure_channels = System.get_env("SECURE_CHANNELS", "true") != "false"
Expand All @@ -53,9 +46,6 @@ config :realtime,
db_user: db_user,
db_password: db_password,
db_ssl: db_ssl,
db_retry_initial_delay: db_retry_initial_delay,
db_retry_maximum_delay: db_retry_maximum_delay,
db_retry_jitter: db_retry_jitter,
slot_name: slot_name,
configuration_file: configuration_file,
secure_channels: secure_channels,
Expand Down
37 changes: 21 additions & 16 deletions server/lib/adapters/postgres/epgsql_implementation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Realtime.Adapters.Postgres.EpgsqlImplementation do
@behaviour Realtime.Adapters.Postgres.AdapterBehaviour
require Logger

alias Realtime.Replication.State

@impl true
def init(config) do
epgsql_config =
Expand All @@ -18,22 +20,25 @@ defmodule Realtime.Adapters.Postgres.EpgsqlImplementation do
|> Enum.map(fn pub -> ~s("#{pub}") end)
|> Enum.join(",")

with {:ok, epgsql_pid} <- :epgsql.connect(epgsql_config),
{:ok, slot_name} <-
create_replication_slot(epgsql_pid, Keyword.get(config, :slot, :temporary)),
:ok <-
:epgsql.start_replication(
epgsql_pid,
slot_name,
self(),
[],
'#{xlog}/#{offset}',
'proto_version \'1\', publication_names \'#{publication_names}\''
) do
{:ok, epgsql_pid}
else
reason ->
{:error, reason}
case :epgsql.connect(epgsql_config) do
{:ok, epgsql_pid} ->
{:ok, slot_name} =
create_replication_slot(epgsql_pid, Keyword.get(config, :slot, :temporary))

:ok =
:epgsql.start_replication(
epgsql_pid,
slot_name,
self(),
[],
'#{xlog}/#{offset}',
'proto_version \'1\', publication_names \'#{publication_names}\''
)

{:ok, %State{config: config, connection: epgsql_pid}}

{:error, reason} ->
{:stop, reason}
end
end

Expand Down
9 changes: 1 addition & 8 deletions server/lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ defmodule Realtime.Application do

configuration_file = Application.fetch_env!(:realtime, :configuration_file)

db_retry_initial_delay = Application.fetch_env!(:realtime, :db_retry_initial_delay)
db_retry_maximum_delay = Application.fetch_env!(:realtime, :db_retry_maximum_delay)
db_retry_jitter = Application.fetch_env!(:realtime, :db_retry_jitter)

if Application.fetch_env!(:realtime, :secure_channels) do
if Application.fetch_env!(:realtime, :jwt_secret) == "" do
raise JwtSecretError, message: "JWT secret is missing"
Expand All @@ -56,10 +52,7 @@ defmodule Realtime.Application do
# Start the endpoint when the application starts
RealtimeWeb.Endpoint,
{
Realtime.Adapters.ConnRetry,
conn_retry_initial_delay: db_retry_initial_delay,
conn_retry_maximum_delay: db_retry_maximum_delay,
conn_retry_jitter: db_retry_jitter
Realtime.Adapters.ConnRetry
},
{
Realtime.ReplicationSupervisor,
Expand Down
27 changes: 3 additions & 24 deletions server/lib/realtime/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,14 @@ defmodule Realtime.Replication do
}

alias Realtime.SubscribersNotification
alias Realtime.Adapters.ConnRetry

def start_link(config) do
GenServer.start_link(__MODULE__, config)
end

@impl true
def init(config) do
{:ok, %State{config: config}, {:continue, :init_db_conn}}
end

@impl true
def handle_continue(:init_db_conn, %State{config: config} = state) do
:timer.sleep(Realtime.Adapters.ConnRetry.get_retry_delay())

case adapter_impl(config).init(config) do
{:ok, epgsql_pid} ->
{:noreply, %State{state | connection: epgsql_pid}}

{:error, reason} ->
{:stop, reason}
end
adapter_impl(config).init(config)
end

@impl true
Expand All @@ -72,7 +58,8 @@ defmodule Realtime.Replication do
Logger.debug("Received binary message: #{inspect(binary_msg, limit: :infinity)}")
Logger.debug("Decoded message: " <> inspect(decoded, limit: :infinity))

{:noreply, process_message(decoded, Map.put(state, :should_reset_retry, false))}
{:noreply, process_message(decoded, state)}

end

@impl true
Expand All @@ -81,14 +68,6 @@ defmodule Realtime.Replication do
{:noreply, state}
end

defp reset_retry_delays(false) do
:ok
end

defp reset_retry_delays(true) do
:ok = ConnRetry.reset_retry_delay()
end

defp process_message(%Begin{final_lsn: final_lsn, commit_timestamp: commit_timestamp}, state) do
%State{
state
Expand Down
1 change: 0 additions & 1 deletion server/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ defmodule Realtime.MixProject do
{:joken, "~> 2.3.0"},
{:plug_cowboy, "~> 2.0"},
{:epgsql, "~> 4.2"},
{:retry, "~> 0.14"},
{:mock, "~> 0.3.0", only: :test}
]
end
Expand Down
2 changes: 0 additions & 2 deletions server/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mock": {:hex, :mock, "0.3.6", "e810a91fabc7adf63ab5fdbec5d9d3b492413b8cda5131a2a8aa34b4185eb9b4", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "bcf1d0a6826fb5aee01bae3d74474669a3fa8b2df274d094af54a25266a1ebd2"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"pgoutput_decoder": {:hex, :pgoutput_decoder, "0.1.0", "d4ffae6e58a563f2e6de8a0495d9f9afbe2f4ac75d6805419cd4a0d05f414c00", [:mix], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.10", "619e4a545505f562cd294df52294372d012823f4fd9d34a6657a8b242898c255", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "256ad7a140efadc3f0290470369da5bd3de985ec7c706eba07c2641b228974be"},
"phoenix_html": {:hex, :phoenix_html, "2.13.3", "850e292ff6e204257f5f9c4c54a8cb1f6fbc16ed53d360c2b780a3d0ba333867", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "8b01b3d6d39731ab18aa548d928b5796166d2500755f553725cfe967bafba7d9"},
"phoenix_live_reload": {:hex, :phoenix_live_reload, "1.2.1", "274a4b07c4adbdd7785d45a8b0bb57634d0b4f45b18d2c508b26c0344bd59b8f", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "41b4103a2fa282cfd747d377233baf213c648fdcc7928f432937676532490eee"},
Expand All @@ -26,7 +25,6 @@
"plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "6cd8ddd1bd1fbfa54d3fc61d4719c2057dae67615395d58d40437a919a46f132"},
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm", "73c1682f0e414cfb5d9b95c8e8cd6ffcfdae699e3b05e1db744e58b7be857759"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"retry": {:hex, :retry, "0.14.1", "722d1b0cf87096b71213f5801d99fface7ca76adc83fc9dbf3e1daee952aef10", [:mix], [], "hexpm", "b3a609f286f6fe4f6b2c15f32cd4a8a60427d78d05d7b68c2dd9110981111ae0"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.5.0", "8516502659002cec19e244ebd90d312183064be95025a319a6c7e89f4bccd65b", [:rebar3], [], "hexpm", "d48d002e15f5cc105a696cf2f1bbb3fc72b4b770a184d8420c8db20da2674b38"},
Expand Down
70 changes: 0 additions & 70 deletions server/test/realtime/replication_test.exs
Original file line number Diff line number Diff line change
@@ -1,78 +1,8 @@
defmodule Realtime.ReplicationTest do
use ExUnit.Case

import Mock

alias Realtime.Replication.State

doctest Realtime.Replication, import: true

@test_config [
epgsql: %{
database: "test",
host: "localhost",
password: "postgres",
port: 5432,
ssl: true,
username: "postgres"
},
slot: :temporary,
wal_position: {"0", "0"},
publications: ["pub_test"]
]

@test_state %State{
config: @test_config,
connection: nil,
subscribers: [],
transaction: nil,
relations: %{},
types: %{}
}

test "Realtime.Replication.init/1 returns correct state" do
assert {:ok, @test_state, {:continue, :init_db_conn}} =
Realtime.Replication.init(
epgsql: %{
database: "test",
host: "localhost",
password: "postgres",
port: 5432,
ssl: true,
username: "postgres"
},
slot: :temporary,
wal_position: {"0", "0"},
publications: ["pub_test"]
)
end

test "Realtime.Replication.handle_continue/2 :: :init_db_conn when adapter conn successful" do
with_mock Realtime.Adapters.Postgres.EpgsqlImplementation,
init: fn _ ->
{:ok, "epgsqpl_pid"}
end do
assert {:noreply,
%State{
config: @test_config,
connection: "epgsqpl_pid",
subscribers: [],
transaction: nil,
relations: %{},
types: %{}
}} = Realtime.Replication.handle_continue(:init_db_conn, @test_state)
end
end

test "Realtime.Replication.handle_continue/2 :: :init_db_conn when adapter conn fails" do
with_mock Realtime.Adapters.Postgres.EpgsqlImplementation,
init: fn _ ->
{:error, {:error, :econnrefused}}
end do
assert {:stop, {:error, :econnrefused}} = Realtime.Replication.handle_continue(:init_db_conn, @test_state)
end
end

test "Integration Test: 0.2.0" do
assert Realtime.Replication.handle_info(
{:epgsql, 0,
Expand Down

0 comments on commit 6185462

Please sign in to comment.