diff --git a/lib/horde/uniform_distribution.ex b/lib/horde/uniform_distribution.ex index 5cf69ef2..47d7635c 100644 --- a/lib/horde/uniform_distribution.ex +++ b/lib/horde/uniform_distribution.ex @@ -7,27 +7,32 @@ defmodule Horde.UniformDistribution do def choose_node(identifier, members) do members = - members - |> Enum.filter(fn - %{status: :alive} -> true - _ -> false - end) - |> Enum.sort_by(fn %{name: name} -> name end) + filter_members(members) + |> Map.new(fn member -> {member.name, member} end) case Enum.count(members) do 0 -> {:error, :no_alive_nodes} count -> - index = hash(term_to_string_identifier(identifier)) |> rem(count) - {:ok, Enum.at(members, index)} + chosen_member = + HashRing.new() + |> HashRing.add_nodes(Map.keys(members)) + |> HashRing.key_to_node(identifier) + + {:ok, Map.get(members, chosen_member)} end end + defp filter_members(members) do + Enum.filter(members, fn + %{status: :alive} -> true + _ -> false + end) + end + defp hash(identifier) when is_integer(identifier), do: identifier defp hash(identifier), do: :erlang.phash2(identifier) def has_quorum?(_members), do: true - - defp term_to_string_identifier(term), do: term |> :erlang.term_to_binary() |> Base.encode16() end diff --git a/mix.exs b/mix.exs index 2ad1760e..f16fd02c 100644 --- a/mix.exs +++ b/mix.exs @@ -26,6 +26,7 @@ defmodule Horde.MixProject do defp deps do [ {:delta_crdt, "~> 0.5.4"}, + {:libring, "~> 1.4"}, {:ex_doc, "~> 0.16", only: :dev, runtime: false}, {:benchee, "> 0.0.1", only: :dev, runtime: false}, {:stream_data, "~> 0.4", only: :test}, diff --git a/mix.lock b/mix.lock index b1e3ce8c..c7794b1f 100644 --- a/mix.lock +++ b/mix.lock @@ -10,6 +10,7 @@ "exprintf": {:hex, :exprintf, "0.2.1", "b7e895dfb00520cfb7fc1671303b63b37dc3897c59be7cbf1ae62f766a8a0314", [:mix], [], "hexpm"}, "exprof": {:hex, :exprof, "0.2.3", "8d4d657d73fc0c9ef1e30b2f9207b26ccbd2aec2baf1ca43f0b6d244c841c9f8", [:mix], [{:exprintf, "~> 0.2", [hex: :exprintf, repo: "hexpm", optional: false]}], "hexpm"}, "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"}, "local_cluster": {:hex, :local_cluster, "1.0.4", "3cabf9b267cf276752ee6ceccdbe3a05a2125da1b93c5bb74b586ae4539ebd66", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "0.5.5", "9e08dfc45280c5684d771ad58159f718a7b5788596099bdfb0284597d368a882", [:mix], [{:nimble_parsec, "~> 0.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.10.0", "0f09c2ddf352887a956d84f8f7e702111122ca32fbbc84c2f0569b8b65cbf7fa", [:mix], [{:makeup, "~> 0.5.5", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/uniform_distribution_test.exs b/test/uniform_distribution_test.exs index 5d03c0e0..a4227dd6 100644 --- a/test/uniform_distribution_test.exs +++ b/test/uniform_distribution_test.exs @@ -8,7 +8,7 @@ defmodule UniformDistributionTest do status <- StreamData.member_of([:alive, :dead, :shutting_down]), name <- binary(), pid <- atom(:alias) do - %{node_id: node_id, status: status, pid: pid, name: name} + %{node_id: node_id, status: status, pid: pid, name: "A#{name}"} end check all members <- list_of(member), diff --git a/test/uniform_quorum_distribution_test.exs b/test/uniform_quorum_distribution_test.exs index 86d6a2b2..05ba19b3 100644 --- a/test/uniform_quorum_distribution_test.exs +++ b/test/uniform_quorum_distribution_test.exs @@ -8,7 +8,7 @@ defmodule UniformQuorumDistributionTest do status <- StreamData.member_of([:alive, :dead, :shutting_down]), name <- binary(), pid <- atom(:alias) do - %{node_id: node_id, status: status, pid: pid, name: name} + %{node_id: node_id, status: status, pid: pid, name: "A#{name}"} end check all members <-