Skip to content

Commit

Permalink
fix: dynamic supervisor restart bug
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 27, 2023
1 parent dc56bfe commit b9f8f73
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 76 deletions.
6 changes: 3 additions & 3 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,9 @@ defmodule Finch do
See the `Finch.HTTP1.PoolMetrics` and `Finch.HTTP2.PoolMetrics` for more details.
`{:error, :not_found}` may return on 2 scenarios. There is no pool on the
given finch instance regarding the given url or the pool is configured with
`start_pool_metrics?` option false (the default).
`{:error, :not_found}` may return on 2 scenarios:
- There is no pool registered for the given pair finch instance and url
- The pool is configured with `start_pool_metrics?` option false (default)
## Example
Expand Down
41 changes: 19 additions & 22 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,13 @@ defmodule Finch.HTTP1.Pool do
{shp, registry_name, pool_size, conn_opts, pool_max_idle_time, start_pool_metrics?,
pool_idx}
) do
{:ok, metric_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(pool_idx, pool_size),
else: {:ok, nil}

{:ok, pid} =
NimblePool.start_link(
worker: {__MODULE__, {registry_name, shp, pool_idx, metric_ref, conn_opts}},
pool_size: pool_size,
lazy: true,
worker_idle_timeout: pool_idle_timeout(pool_max_idle_time)
)

{:ok, pid, metric_ref}
NimblePool.start_link(
worker:
{__MODULE__, {registry_name, shp, pool_idx, pool_size, start_pool_metrics?, conn_opts}},
pool_size: pool_size,
lazy: true,
worker_idle_timeout: pool_idle_timeout(pool_max_idle_time)
)
end

@impl Finch.Pool
Expand Down Expand Up @@ -146,22 +139,26 @@ defmodule Finch.HTTP1.Pool do

@impl Finch.Pool
def get_pool_status(finch_name, shp) do
case Finch.PoolManager.get_metrics_refs(finch_name, shp) do
case Finch.PoolManager.get_pool_count(finch_name, shp) do
nil ->
{:error, :not_found}

refs ->
resp =
refs
|> Enum.map(&PoolMetrics.get_pool_status/1)
|> Enum.map(fn {:ok, metrics} -> metrics end)
count ->
result = Enum.map(1..count, &PoolMetrics.get_pool_status(finch_name, shp, &1))

{:ok, resp}
if Enum.all?(result, &match?({:ok, _}, &1)),
do: {:ok, Enum.map(result, &elem(&1, 1))},
else: {:error, :not_found}
end
end

@impl NimblePool
def init_pool({registry, shp, pool_idx, metric_ref, opts}) do
def init_pool({registry, shp, pool_idx, pool_size, start_pool_metrics?, opts}) do
{:ok, metric_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(registry, shp, pool_idx, pool_size),
else: {:ok, nil}

# Register our pool with our module name as the key. This allows the caller
# to determine the correct pool module to use to make the request
{:ok, _} = Registry.register(registry, shp, __MODULE__)
Expand Down
12 changes: 11 additions & 1 deletion lib/finch/http1/pool_metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ defmodule Finch.HTTP1.PoolMetrics do
in_use_connections: 3
]

def init(pool_idx, pool_size) do
def init(registry, shp, pool_idx, pool_size) do
ref = :atomics.new(length(@atomic_idx), [])
:atomics.add(ref, @atomic_idx[:pool_idx], pool_idx)
:atomics.add(ref, @atomic_idx[:pool_size], pool_size)

:persistent_term.put({__MODULE__, registry, shp, pool_idx}, ref)
{:ok, ref}
end

Expand All @@ -49,6 +51,12 @@ defmodule Finch.HTTP1.PoolMetrics do
end)
end

def get_pool_status(name, shp, pool_idx) do
{__MODULE__, name, shp, pool_idx}
|> :persistent_term.get(nil)
|> get_pool_status()
end

def get_pool_status(ref) when is_reference(ref) do
%{
pool_idx: pool_idx,
Expand All @@ -68,4 +76,6 @@ defmodule Finch.HTTP1.PoolMetrics do

{:ok, result}
end

def get_pool_status(nil), do: {:error, :not_found}
end
31 changes: 13 additions & 18 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,16 @@ defmodule Finch.HTTP2.Pool do

@impl Finch.Pool
def get_pool_status(finch_name, shp) do
case Finch.PoolManager.get_metrics_refs(finch_name, shp) do
case Finch.PoolManager.get_pool_count(finch_name, shp) do
nil ->
{:error, :not_found}

refs ->
resp =
refs
|> Enum.map(&PoolMetrics.get_pool_status/1)
|> Enum.map(fn {:ok, metrics} -> metrics end)
count ->
result = Enum.map(1..count, &PoolMetrics.get_pool_status(finch_name, shp, &1))

{:ok, resp}
if Enum.all?(result, &match?({:ok, _}, &1)),
do: {:ok, Enum.map(result, &elem(&1, 1))},
else: {:error, :not_found}
end
end

Expand Down Expand Up @@ -190,21 +189,17 @@ defmodule Finch.HTTP2.Pool do
end
end

def start_link({shp, finch_name, pool_config, start_pool_metrics?, pool_idx}) do
def start_link({_shp, _finch_name, _pool_config, _start_pool_metrics?, _pool_idx} = opts) do
:gen_statem.start_link(__MODULE__, opts, [])
end

@impl true
def init({{scheme, host, port} = shp, registry, pool_opts, start_pool_metrics?, pool_idx}) do
{:ok, metrics_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(pool_idx),
do: PoolMetrics.init(registry, shp, pool_idx),
else: {:ok, nil}

new_opts = {shp, finch_name, pool_config, metrics_ref, pool_idx}

{:ok, pid} = :gen_statem.start_link(__MODULE__, new_opts, [])

{:ok, pid, metrics_ref}
end

@impl true
def init({{scheme, host, port} = shp, registry, pool_opts, metrics_ref, pool_idx}) do
{:ok, _} = Registry.register(registry, shp, __MODULE__)

data = %{
Expand Down
11 changes: 10 additions & 1 deletion lib/finch/http2/pool_metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ defmodule Finch.HTTP2.PoolMetrics do
in_flight_requests: 2
]

def init(pool_idx) do
def init(finch_name, shp, pool_idx) do
ref = :atomics.new(length(@atomic_idx), [])
:atomics.put(ref, @atomic_idx[:pool_idx], pool_idx)

:persistent_term.put({__MODULE__, finch_name, shp, pool_idx}, ref)
{:ok, ref}
end

Expand All @@ -40,6 +41,12 @@ defmodule Finch.HTTP2.PoolMetrics do
end)
end

def get_pool_status(name, shp, pool_idx) do
{__MODULE__, name, shp, pool_idx}
|> :persistent_term.get(nil)
|> get_pool_status()
end

def get_pool_status(ref) when is_reference(ref) do
%{
pool_idx: pool_idx,
Expand All @@ -56,4 +63,6 @@ defmodule Finch.HTTP2.PoolMetrics do

{:ok, result}
end

def get_pool_status(nil), do: {:error, :not_found}
end
30 changes: 13 additions & 17 deletions lib/finch/pool_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,26 @@ defmodule Finch.PoolManager do
defp do_start_pools(shp, config) do
pool_config = pool_config(config, shp)

init_results =
Enum.map(1..pool_config.count, fn pool_idx ->
pool_args = pool_args(shp, config, pool_config, pool_idx)
# Choose pool type here...
{:ok, pid, metric_ref} =
DynamicSupervisor.start_child(config.supervisor_name, {pool_config.mod, pool_args})

{pid, pool_config.mod, metric_ref}
end)

if pool_config.start_pool_metrics? do
put_metrics_refs(config, shp, Enum.map(init_results, &elem(&1, 2)))
put_pool_count(config, shp, pool_config.count)
end

{pid, pool_mod, _} = List.first(init_results)
Enum.map(1..pool_config.count, fn pool_idx ->
pool_args = pool_args(shp, config, pool_config, pool_idx)
# Choose pool type here...
{:ok, pid} =
DynamicSupervisor.start_child(config.supervisor_name, {pool_config.mod, pool_args})

{pid, pool_mod}
{pid, pool_config.mod}
end)
|> hd()
end

defp put_metrics_refs(%{registry_name: name}, shp, refs),
do: :persistent_term.put({__MODULE__, :metrics_refs, name, shp}, refs)
defp put_pool_count(%{registry_name: name}, shp, val),
do: :persistent_term.put({__MODULE__, :pool_count, name, shp}, val)

def get_metrics_refs(finch_name, shp),
do: :persistent_term.get({__MODULE__, :metrics_refs, finch_name, shp}, nil)
def get_pool_count(finch_name, shp),
do: :persistent_term.get({__MODULE__, :pool_count, finch_name, shp}, nil)

defp pool_config(%{pools: config, default_pool_config: default}, shp) do
config
Expand Down
2 changes: 1 addition & 1 deletion test/finch/http1/pool_metrics_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Finch.HTTP1.PoolMetricsTest do
end)

wait_connection_checkin()
assert nil == PoolManager.get_metrics_refs(finch_name, shp)
assert nil == PoolManager.get_pool_count(finch_name, shp)
assert {:error, :not_found} = Finch.get_pool_status(finch_name, shp)
end

Expand Down
6 changes: 3 additions & 3 deletions test/finch/http2/pool_metrics_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Finch.HTTP2.PoolMetricsTest do
ref
end)

Process.sleep(400)
Process.sleep(50)

assert {:ok,
[
Expand Down Expand Up @@ -115,7 +115,7 @@ defmodule Finch.HTTP2.PoolMetricsTest do
end)
end)

Process.sleep(400)
Process.sleep(50)

assert {:ok,
[
Expand Down Expand Up @@ -169,7 +169,7 @@ defmodule Finch.HTTP2.PoolMetricsTest do
ref
end)

Process.sleep(400)
Process.sleep(50)

assert {:ok,
[
Expand Down
17 changes: 7 additions & 10 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ defmodule Finch.HTTP2.PoolTest do
end

def start_pool(port) do
{:ok, pid, _info} =
Pool.start_link({
{:https, "localhost", port},
:test,
[conn_opts: [transport_opts: [verify: :verify_none]]],
false,
1
})

{:ok, pid}
Pool.start_link({
{:https, "localhost", port},
:test,
[conn_opts: [transport_opts: [verify: :verify_none]]],
false,
1
})
end

describe "requests" do
Expand Down

0 comments on commit b9f8f73

Please sign in to comment.