Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option process_distribution: :active which rebalances processes on node joining / leaving. #164

Merged
merged 31 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2acfaa9
Adding Horde.DynamicSupervisor.rebalance/2 method which can be used t…
Sep 20, 2019
2ef60a3
Adding tests for the rebalancer, modify the rebalancer such that it w…
Sep 23, 2019
3113091
Adding more to the test for rebalance/1, waits for the CRDT to update…
Sep 23, 2019
ef1a448
Fixing small dialyzer problem with the rebalancer. terminate_child an…
Sep 23, 2019
bd6add5
Oops, fixing one stupid compiler warning about an unused opts var. St…
Sep 23, 2019
d5ade54
Fix formatting according to mix format
Sep 23, 2019
647ee02
Merge branch 'master' of https://github.com/derekkraan/horde
Oct 12, 2019
fc845e6
Rewriting rebalance as redistribute/1, conforming to the design discu…
Oct 13, 2019
3c4fe4d
Almost fixing all problems related to graceful shutdown. Make sure re…
Oct 14, 2019
1c7421d
Changing the logic of needs_redistribution? a little bit such that I …
Oct 15, 2019
6cc2d0d
Adding changes which _seem to_ promote stability in the graceful shut…
Oct 15, 2019
8b16836
Making redistribution highly configurable/pluggable via the redistrib…
Oct 15, 2019
d563c88
Make sure that if the permitted options dont allow redistribute_on :d…
Oct 16, 2019
5e32d67
Ensure :redistribute status will definitely trigger a redistribution
Oct 16, 2019
c989b10
Making the failover test pass again, ensure that all state transition…
Oct 16, 2019
8142783
Adding termination reason to the processes supervisor such that I can…
Oct 16, 2019
3533708
Cleaning up and modularizing the redistribution test setup and helper…
Oct 16, 2019
156f2b1
Ensure that each test will re-set :redistribute_on to the default :al…
Oct 16, 2019
669368c
Adding lots more tests to verify redistribute configuration params ar…
Oct 16, 2019
856805b
Major cleaning up and simplification of the needs_redistribution? cod…
Oct 16, 2019
07bb025
rewriting the redistribute_processes with arity 1 such that Enum.redu…
Oct 17, 2019
4c16936
FINALLY fixing the bloody redistribution. Dios mio de mi vida this wa…
Oct 17, 2019
2a59977
Ensure that the manual rebalance/1 test waits for some (500) millis b…
Oct 17, 2019
f31ccc7
Do the manual redistribution flag differently - instead of it piggy-b…
Oct 17, 2019
b0ac807
Commit mix format formatting
Oct 17, 2019
96cb720
Making the changes discussed with @derekkraan on horde/pull/164 - rem…
Oct 23, 2019
2353355
Updating derekkraan/horde/pull/164 - conform to the very sound design…
Oct 29, 2019
692028a
Updating pull/164, resolving all change requests. Cleaning up areas o…
Nov 2, 2019
6e1e9ea
Fixing small compiler warnings in my test RebalanceTestServer class w…
Nov 2, 2019
1be71f6
Tweaking the implementation a little such that Process.exit is actual…
sikanrong Nov 2, 2019
6ff4f63
Removing unwanted comments that I accidentally left in. Changing :sen…
sikanrong Nov 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,30 +1,3 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

# You can configure your application as:
#
# config :horde, key: :value
#
# and access this configuration in your application as:
#
# Application.get_env(:horde, :key)
#
# You can also configure a 3rd-party app:
#
# config :logger, level: :info
#

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"
10 changes: 8 additions & 2 deletions lib/horde/dynamic_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule Horde.DynamicSupervisor do
| {:shutdown, integer()}
| {:members, [Horde.Cluster.member()]}
| {:delta_crdt_options, [DeltaCrdt.crdt_option()]}
| {:process_redistribution, :active | :passive}

@callback init(options()) :: {:ok, options()}
@callback child_spec(options :: options()) :: Supervisor.child_spec()
Expand Down Expand Up @@ -117,6 +118,7 @@ defmodule Horde.DynamicSupervisor do
:max_restarts,
:strategy,
:distribution_strategy,
:process_redistribution,
:members,
:delta_crdt_options
]
Expand Down Expand Up @@ -145,6 +147,7 @@ defmodule Horde.DynamicSupervisor do
extra_arguments = Keyword.get(options, :extra_arguments, [])
members = Keyword.get(options, :members, [])
delta_crdt_options = Keyword.get(options, :delta_crdt_options, [])
process_redistribution = Keyword.get(options, :process_redistribution, :passive)

distribution_strategy =
Keyword.get(
Expand All @@ -161,7 +164,8 @@ defmodule Horde.DynamicSupervisor do
extra_arguments: extra_arguments,
distribution_strategy: distribution_strategy,
members: members,
delta_crdt_options: delta_crdt_options(delta_crdt_options)
delta_crdt_options: delta_crdt_options(delta_crdt_options),
process_redistribution: process_redistribution
}

{:ok, flags}
Expand Down Expand Up @@ -192,6 +196,7 @@ defmodule Horde.DynamicSupervisor do
extra_arguments: flags.extra_arguments,
strategy: flags.strategy,
distribution_strategy: flags.distribution_strategy,
process_redistribution: flags.process_redistribution,
members: members(flags.members, name)
]},
{Horde.GracefulShutdownManager,
Expand Down Expand Up @@ -244,7 +249,8 @@ defmodule Horde.DynamicSupervisor do

Works like `DynamicSupervisor.terminate_child/2`.
"""
@spec terminate_child(Supervisor.supervisor(), child_pid :: pid()) :: :ok | {:error, :not_found}
@spec terminate_child(Supervisor.supervisor(), child_pid :: pid()) ::
:ok | {:error, :not_found} | {:error, {:node_dead_or_shutting_down, String.t()}}
sikanrong marked this conversation as resolved.
Show resolved Hide resolved
def terminate_child(supervisor, child_pid) when is_pid(child_pid),
do: call(supervisor, {:terminate_child, child_pid})

Expand Down
167 changes: 109 additions & 58 deletions lib/horde/dynamic_supervisor_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,8 @@ defmodule Horde.DynamicSupervisorImpl do
this_name = fully_qualified_name(state.name)

with child_id when not is_nil(child_id) <- Map.get(state.process_pid_to_id, child_pid),
{^this_name, _child_spec, _child_pid} <- Map.get(state.processes_by_id, child_id) do
reply =
Horde.ProcessesSupervisor.terminate_child_by_id(
supervisor_name(state.name),
child_id
)

new_state =
Map.put(state, :processes_by_id, Map.delete(state.processes_by_id, child_id))
|> Map.put(:local_process_count, state.local_process_count - 1)

:ok = DeltaCrdt.mutate(crdt_name(state.name), :remove, [{:process, child_id}], :infinity)

{^this_name, child, _child_pid} <- Map.get(state.processes_by_id, child_id),
{reply, new_state} <- terminate_child(child, state) do
{:reply, reply, new_state}
else
{other_node, _child_spec, _child_pid} ->
Expand All @@ -146,14 +135,9 @@ defmodule Horde.DynamicSupervisorImpl do
def handle_call({:start_child, child_spec} = msg, from, state) do
this_name = fully_qualified_name(state.name)

child_spec = Map.put(child_spec, :id, :rand.uniform(@big_number))

distribution_id = :erlang.phash2(Map.drop(child_spec, [:id]))
child_spec = randomize_child_id(child_spec)

case state.distribution_strategy.choose_node(
distribution_id,
Map.values(members(state))
) do
case choose_node(child_spec, state) do
{:ok, %{name: ^this_name}} ->
{reply, new_state} = add_child(child_spec, state)
{:reply, reply, new_state}
Expand Down Expand Up @@ -240,6 +224,21 @@ defmodule Horde.DynamicSupervisorImpl do
end
end

def handle_cast({:relinquish_child_process, child_id}, state) do
# signal to the rest of the nodes that this process has been relinquished
# (to the Horde!) by its parent
{_, child, _} = Map.get(state.processes_by_id, child_id)

:ok =
DeltaCrdt.mutate(
crdt_name(state.name),
:add,
[{:process, child.id}, {nil, child}]
)

{:noreply, state}
end

# TODO think of a better name than "disown_child_process"
def handle_cast({:disown_child_process, child_id}, state) do
{{_, _, child_pid}, new_processes_by_id} = Map.pop(state.processes_by_id, child_id)
Expand All @@ -255,6 +254,10 @@ defmodule Horde.DynamicSupervisorImpl do
{:noreply, new_state}
end

defp randomize_child_id(child) do
Map.put(child, :id, :rand.uniform(@big_number))
end

defp proxy_to_node(node_name, message, reply_to, state) do
case Map.get(members(state), node_name) do
%{status: :alive} ->
Expand Down Expand Up @@ -320,7 +323,6 @@ defmodule Horde.DynamicSupervisorImpl do
end
end

@doc false
def handle_info({:DOWN, ref, _type, _pid, _reason}, state) do
case Map.get(state.supervisor_ref_to_name, ref) do
nil ->
Expand Down Expand Up @@ -354,7 +356,7 @@ defmodule Horde.DynamicSupervisorImpl do
|> set_own_node_status()
|> handle_quorum_change()
|> set_crdt_neighbours()
|> handle_dead_nodes()
|> handoff_processes()
else
new_state
end
Expand All @@ -376,20 +378,60 @@ defmodule Horde.DynamicSupervisorImpl do

def has_membership_change?([]), do: false

defp handoff_processes(state) do
this_node = fully_qualified_name(state.name)

Map.values(state.processes_by_id)
|> Enum.reduce(state, fn {current_node, child, _child_pid}, state ->
case choose_node(child, state) do
{:ok, %{name: chosen_node}} ->
current_member = Map.get(state.members_info, current_node)

cond do
this_node != current_node and this_node == chosen_node ->
# handle_dead_nodes
case current_member do
%{status: :dead} ->
{{:ok, _}, state} = add_child(child, state)
state

_ ->
state
end

this_node == current_node and chosen_node != this_node ->
case state.supervisor_options[:process_redistribution] do
:active ->
handoff_child(child, state)

:passive ->
state
end

true ->
state
end

{:error, :no_alive_nodes} ->
state
end
end)
end

defp update_processes(state, [diff | diffs]) do
update_process(state, diff)
|> update_processes(diffs)
end

defp update_processes(state, []), do: state

defp update_process(state, {:add, {:process, child_id}, {nil, child_spec}}) do
defp update_process(state, {:add, {:process, _child_id}, {nil, child_spec}}) do
this_name = fully_qualified_name(state.name)

case state.distribution_strategy.choose_node(child_id, Map.values(members(state))) do
case choose_node(child_spec, state) do
sikanrong marked this conversation as resolved.
Show resolved Hide resolved
{:ok, %{name: ^this_name}} ->
{_resp, state} = add_child(child_spec, state)
state
{_resp, new_state} = add_child(child_spec, state)
new_state

_ ->
state
Expand Down Expand Up @@ -480,37 +522,6 @@ defmodule Horde.DynamicSupervisorImpl do
%Horde.DynamicSupervisor.Member{status: :uninitialized, name: member}
end

defp handle_dead_nodes(state) do
not_dead_members =
Enum.flat_map(members(state), fn
{_, %{status: :dead}} -> []
{not_dead, _} -> [not_dead]
end)
|> MapSet.new()

check_processes(state, Map.values(state.processes_by_id), not_dead_members)
end

defp check_processes(state, [{member, child, _child_pid} | procs], not_dead_members) do
this_name = fully_qualified_name(state.name)

with false <- MapSet.member?(not_dead_members, member),
{:ok, %{name: ^this_name}} <-
state.distribution_strategy.choose_node(
child.id,
Map.values(members(state))
),
{_result, state} <- add_child(child, state) do
state
else
_ ->
state
end
|> check_processes(procs, not_dead_members)
end

defp check_processes(state, [], _), do: state

defp member_names(names) do
Enum.map(names, fn
{name, node} -> {name, node}
Expand Down Expand Up @@ -557,7 +568,6 @@ defmodule Horde.DynamicSupervisorImpl do
|> monitor_supervisors()
|> handle_quorum_change()
|> set_crdt_neighbours()
|> handle_dead_nodes()
end

defp handle_quorum_change(state) do
Expand Down Expand Up @@ -649,6 +659,38 @@ defmodule Horde.DynamicSupervisorImpl do
|> Map.put(:local_process_count, new_local_process_count)
end

defp handoff_child(child, state) do
{_, _, child_pid} = Map.get(state.processes_by_id, child.id)

Horde.ProcessesSupervisor.send_exit_signal(
supervisor_name(state.name),
child_pid,
{:shutdown, :process_redistribution}
)

new_state = Map.put(state, :local_process_count, state.local_process_count - 1)

new_state
end

defp terminate_child(child, state) do
child_id = child.id

reply =
Horde.ProcessesSupervisor.terminate_child_by_id(
supervisor_name(state.name),
child_id
)

new_state =
Map.put(state, :processes_by_id, Map.delete(state.processes_by_id, child_id))
|> Map.put(:local_process_count, state.local_process_count - 1)

:ok = DeltaCrdt.mutate(crdt_name(state.name), :remove, [{:process, child_id}], :infinity)

{reply, new_state}
end

defp add_child(child, state) do
{[response], new_state} = add_children([child], state)
{response, new_state}
Expand Down Expand Up @@ -685,6 +727,15 @@ defmodule Horde.DynamicSupervisorImpl do
end)
end

defp choose_node(child_spec, state) do
distribution_id = :erlang.phash2(Map.drop(child_spec, [:id]))

state.distribution_strategy.choose_node(
distribution_id,
Map.values(members(state))
)
end

defp members(state) do
Map.take(state.members_info, Map.keys(state.members))
end
Expand Down
5 changes: 3 additions & 2 deletions lib/horde/graceful_shutdown_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ defmodule Horde.GracefulShutdownManager do
end

def handle_cast({:shut_down, child_spec}, {processes_pid, true} = s) do
GenServer.cast(
DeltaCrdt.mutate_async(
sikanrong marked this conversation as resolved.
Show resolved Hide resolved
processes_pid,
{:operation, {:add, [{:process, child_spec.id}, {nil, child_spec}]}}
:add,
[{:process, child_spec.id}, {nil, child_spec}]
)

{:noreply, s}
Expand Down
19 changes: 19 additions & 0 deletions lib/horde/processes_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ defmodule Horde.ProcessesSupervisor do
end
end

def send_exit_signal(supervisor, pid, reason) do
GenServer.call(supervisor, {:send_exit_signal, pid, reason})
end

def terminate_child_by_id(supervisor, child_id) do
call(supervisor, {:terminate_child_by_id, child_id})
end
Expand Down Expand Up @@ -664,6 +668,11 @@ defmodule Horde.ProcessesSupervisor do
defp validate_extra_arguments(list) when is_list(list), do: :ok
defp validate_extra_arguments(extra), do: {:error, {:invalid_extra_arguments, extra}}

def handle_call({:send_exit_signal, pid, reason}, _f, state) do
Process.exit(pid, reason)
{:reply, :ok, state}
end

def handle_call({:terminate_child_by_id, child_id}, from, state) do
handle_call({:terminate_child, state.child_id_to_pid[child_id]}, from, state)
end
Expand Down Expand Up @@ -991,6 +1000,11 @@ defmodule Horde.ProcessesSupervisor do
end
end

defp maybe_restart_child(_, {:shutdown, :process_redistribution}, pid, _child, state) do
relinquish_child_to_horde(state, pid)
{:ok, delete_child(pid, state)}
end

defp maybe_restart_child(:permanent, reason, pid, child, state) do
report_error(:child_terminated, reason, pid, child, state)
restart_child(pid, child, state)
Expand Down Expand Up @@ -1022,6 +1036,11 @@ defmodule Horde.ProcessesSupervisor do
{:ok, delete_child(pid, state)}
end

defp relinquish_child_to_horde(state, pid) do
{child_id, _, _, _, _, _} = Map.get(state.children, pid)
GenServer.cast(state.root_name, {:relinquish_child_process, child_id})
end

defp remove_child_from_horde(state, pid) do
{child_id, _, _, _, _, _} = Map.get(state.children, pid)
GenServer.cast(state.root_name, {:disown_child_process, child_id})
Expand Down
Loading