Skip to content

Commit

Permalink
feat: Add database connection retries (#82)
Browse files Browse the repository at this point in the history
Prevents a temporary database connection error from shutting down entire server.

When connection to database fails then the server will retry with exponential backoff and jitter, preventing server from shutting down.
  • Loading branch information
w3b6x9 authored Dec 14, 2020
1 parent b514a1a commit 2a954b0
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 23 deletions.
9 changes: 9 additions & 0 deletions server/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ db_user = System.get_env("DB_USER", "postgres")
db_password = System.get_env("DB_PASSWORD", "postgres")
# HACK: There's probably a better way to set boolean from env
db_ssl = System.get_env("DB_SSL", "true") === "true"
# Initial delay defaults to half a second
db_retry_initial_delay = System.get_env("DB_RETRY_INITIAL_DELAY", "500")
# Maximum delay defaults to five minutes
db_retry_maximum_delay = System.get_env("DB_RETRY_MAXIMUM_DELAY", "300000")
# Jitter will randomly adjust each delay within 10% of its value
db_retry_jitter = System.get_env("DB_RETRY_JITTER", "10")
slot_name = System.get_env("SLOT_NAME") || :temporary
configuration_file = System.get_env("CONFIGURATION_FILE") || nil

Expand All @@ -30,6 +36,9 @@ config :realtime,
db_user: db_user,
db_password: db_password,
db_ssl: db_ssl,
db_retry_initial_delay: db_retry_initial_delay,
db_retry_maximum_delay: db_retry_maximum_delay,
db_retry_jitter: db_retry_jitter,
slot_name: slot_name,
configuration_file: configuration_file

Expand Down
37 changes: 16 additions & 21 deletions server/lib/adapters/postgres/epgsql_implementation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ defmodule Realtime.Adapters.Postgres.EpgsqlImplementation do
@behaviour Realtime.Adapters.Postgres.AdapterBehaviour
require Logger

alias Realtime.Replication.State

@impl true
def init(config) do
epgsql_config =
Expand All @@ -20,25 +18,22 @@ defmodule Realtime.Adapters.Postgres.EpgsqlImplementation do
|> Enum.map(fn pub -> ~s("#{pub}") end)
|> Enum.join(",")

case :epgsql.connect(epgsql_config) do
{:ok, epgsql_pid} ->
{:ok, slot_name} =
create_replication_slot(epgsql_pid, Keyword.get(config, :slot, :temporary))

:ok =
:epgsql.start_replication(
epgsql_pid,
slot_name,
self(),
[],
'#{xlog}/#{offset}',
'proto_version \'1\', publication_names \'#{publication_names}\''
)

{:ok, %State{config: config, connection: epgsql_pid}}

{:error, reason} ->
{:stop, reason}
with {:ok, epgsql_pid} <- :epgsql.connect(epgsql_config),
{:ok, slot_name} <-
create_replication_slot(epgsql_pid, Keyword.get(config, :slot, :temporary)),
:ok <-
:epgsql.start_replication(
epgsql_pid,
slot_name,
self(),
[],
'#{xlog}/#{offset}',
'proto_version \'1\', publication_names \'#{publication_names}\''
) do
{:ok, epgsql_pid}
else
reason ->
{:error, reason}
end
end

Expand Down
9 changes: 8 additions & 1 deletion server/lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ defmodule Realtime.Application do

configuration_file = Application.fetch_env!(:realtime, :configuration_file)

db_retry_initial_delay = Application.fetch_env!(:realtime, :db_retry_initial_delay)
db_retry_maximum_delay = Application.fetch_env!(:realtime, :db_retry_maximum_delay)
db_retry_jitter = Application.fetch_env!(:realtime, :db_retry_jitter)

# List all child processes to be supervised
children = [
# Start the endpoint when the application starts
Expand All @@ -40,7 +44,10 @@ defmodule Realtime.Application do
epgsql: epgsql_params,
slot: slot_name,
wal_position: {"0", "0"},
publications: ["supabase_realtime"]
publications: ["supabase_realtime"],
conn_retry_initial_delay: db_retry_initial_delay,
conn_retry_maximum_delay: db_retry_maximum_delay,
conn_retry_jitter: db_retry_jitter
},
{
Realtime.ConfigurationManager,
Expand Down
99 changes: 98 additions & 1 deletion server/lib/realtime/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Realtime.Replication do
defstruct(
config: [],
connection: nil,
conn_retry_delays: [],
subscribers: [],
transaction: nil,
relations: %{},
Expand Down Expand Up @@ -37,14 +38,36 @@ defmodule Realtime.Replication do
}

alias Realtime.SubscribersNotification
alias Retry.DelayStreams

def start_link(config) do
GenServer.start_link(__MODULE__, config)
end

@impl true
def init(config) do
adapter_impl(config).init(config)
config =
config
|> Keyword.update!(:conn_retry_initial_delay, &String.to_integer(&1))
|> Keyword.update!(:conn_retry_maximum_delay, &String.to_integer(&1))
|> Keyword.update!(:conn_retry_jitter, &(String.to_integer(&1) / 100))

{:ok, %State{config: config}, {:continue, :init_db_conn}}
end

@impl true
def handle_continue(:init_db_conn, %State{config: config} = state) do
# Database adapter's exit signal will be converted to {:EXIT, From, Reason}
# message when, for example, there's a database connection error.
Process.flag(:trap_exit, true)

case adapter_impl(config).init(config) do
{:ok, epgsql_pid} ->
{:noreply, %State{state | connection: epgsql_pid}}

{:error, reason} ->
{:stop, reason}
end
end

@impl true
Expand All @@ -56,12 +79,86 @@ defmodule Realtime.Replication do
{:noreply, process_message(decoded, state)}
end

@doc """
Receives {:EXIT, From, Reason} message created by Process.flag(:trap_exit, true)
when database adapter's process shuts down.
Database connection retries happen here.
"""
@impl true
def handle_info({:EXIT, _, _}, %State{config: config} = state) do
{retry_delay, new_state} = get_retry_delay(state)

:timer.sleep(retry_delay)

new_state =
case adapter_impl(config).init(config) do
{:ok, epgsql_pid} ->
new_state
|> reset_retry_delay()
|> Map.put(:connection, epgsql_pid)

_ ->
new_state
end

{:noreply, new_state}
end

@impl true
def handle_info(msg, state) do
IO.inspect(msg)
{:noreply, state}
end

def get_retry_delay(%State{conn_retry_delays: [delay | delays]} = state) do
{delay, %State{state | conn_retry_delays: delays}}
end

@doc """
Initial delay is 0 milliseconds for immediate connection attempt.
Future delays are generated and saved to state.
* Begin with initial_delay and increase by a factor of 2
* Each is randomly adjusted with jitter's value
* Capped at maximum_delay
Example
initial_delay: 500 # Half a second
maximum_delay: 300_000 # Five minutes
jitter: 0.1 # Within 10% of a delay's value
[486, 918, 1931, 4067, 7673, 15699, 31783, 64566, 125929, 251911, 300000]
"""
def get_retry_delay(
%State{
conn_retry_delays: [],
config: config
} = state
) do
initial_delay = Keyword.get(config, :conn_retry_initial_delay)
maximum_delay = Keyword.get(config, :conn_retry_maximum_delay)
jitter = Keyword.get(config, :conn_retry_jitter)

delays =
DelayStreams.exponential_backoff(initial_delay)
|> DelayStreams.randomize(jitter)
|> DelayStreams.expiry(maximum_delay)
|> Enum.to_list()

{0, %State{state | conn_retry_delays: delays}}
end

def reset_retry_delay(state) do
%State{state | conn_retry_delays: []}
end

defp process_message(%Begin{} = msg, state) do
%{
state
Expand Down
2 changes: 2 additions & 0 deletions server/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ defmodule Realtime.MixProject do
{:jason, "~> 1.0"},
{:plug_cowboy, "~> 2.0"},
{:epgsql, "~> 4.2"},
{:retry, "~> 0.14"},
{:mock, "~> 0.3.0", only: :test}
]
end
end
3 changes: 3 additions & 0 deletions server/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
"httpoison": {:hex, :httpoison, "1.6.2", "ace7c8d3a361cebccbed19c283c349b3d26991eff73a1eaaa8abae2e3c8089b6", [:mix], [{:hackney, "~> 1.15 and >= 1.15.2", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "aa2c74bd271af34239a3948779612f87df2422c2fdcfdbcec28d9c105f0773fe"},
"idna": {:hex, :idna, "6.0.1", "1d038fb2e7668ce41fbf681d2c45902e52b3cb9e9c77b55334353b222c2ee50c", [:rebar3], [{:unicode_util_compat, "0.5.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a02c8a1c4fd601215bb0b0324c8a6986749f807ce35f25449ec9e69758708122"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mock": {:hex, :mock, "0.3.6", "e810a91fabc7adf63ab5fdbec5d9d3b492413b8cda5131a2a8aa34b4185eb9b4", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "bcf1d0a6826fb5aee01bae3d74474669a3fa8b2df274d094af54a25266a1ebd2"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"pgoutput_decoder": {:hex, :pgoutput_decoder, "0.1.0", "d4ffae6e58a563f2e6de8a0495d9f9afbe2f4ac75d6805419cd4a0d05f414c00", [:mix], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.10", "619e4a545505f562cd294df52294372d012823f4fd9d34a6657a8b242898c255", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "256ad7a140efadc3f0290470369da5bd3de985ec7c706eba07c2641b228974be"},
Expand All @@ -22,6 +24,7 @@
"plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "6cd8ddd1bd1fbfa54d3fc61d4719c2057dae67615395d58d40437a919a46f132"},
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm", "73c1682f0e414cfb5d9b95c8e8cd6ffcfdae699e3b05e1db744e58b7be857759"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"retry": {:hex, :retry, "0.14.1", "722d1b0cf87096b71213f5801d99fface7ca76adc83fc9dbf3e1daee952aef10", [:mix], [], "hexpm", "b3a609f286f6fe4f6b2c15f32cd4a8a60427d78d05d7b68c2dd9110981111ae0"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.5.0", "8516502659002cec19e244ebd90d312183064be95025a319a6c7e89f4bccd65b", [:rebar3], [], "hexpm", "d48d002e15f5cc105a696cf2f1bbb3fc72b4b770a184d8420c8db20da2674b38"},
Expand Down
Loading

3 comments on commit 2a954b0

@barbinbrad
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@w3b6x9 this seems like a pretty historic commit. from what i can tell, at this point, they had just posted a job to maintain the repo for an "elixir expert", and then here you come with with this commit, and you've just been cranking them out ever since. did you have a lot of experience in elixir at this point?

@w3b6x9
Copy link
Member Author

@w3b6x9 w3b6x9 commented on 2a954b0 Dec 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@barbinbrad hello there! I started working with Elixir/Phoenix on a side project around February of 2020 and just kept using the same stack for a few other side projects last year. I learned about Supabase and the Realtime server repo early summer of 2020, and for the rest of the year, I would regularly check the development of Realtime. I was winding down another side project late last year and decided to jump in and finally contribute for a change of pace. A short while later I was fortunate enough to get hired by Supabase and I've been working with the team and contributing regularly to Realtime for the past year.

What brought you to Realtime? Have you had a chance to work with Elixir/Phoenix?

@barbinbrad
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @w3b6x9 thanks for sharing your story. it's really inspiring work. i'm preparing for a new job that's using elixir, and i don't have any experience, so i've been reading through the commits of the best elixir repos i can find. got any recommendations?

Please sign in to comment.