diff --git a/lib/phoenix/channel.ex b/lib/phoenix/channel.ex index 165eba5bbb..a106746e81 100644 --- a/lib/phoenix/channel.ex +++ b/lib/phoenix/channel.ex @@ -48,7 +48,7 @@ defmodule Phoenix.Channel do After a client has successfully joined a channel, incoming events from the client are routed through the channel's `handle_in/3` callbacks. Within these callbacks, you can perform any action. Typically you'll either foward a - message out to all listeners with `Phoenix.Channel.broadcast/3`, or reply + message out to all listeners with `Phoenix.Channel.broadcast!/3`, or reply directly to the socket with `Phoenix.Channel.reply/3`. Incoming callbacks must return the `socket` to maintain ephemeral state. @@ -56,7 +56,7 @@ defmodule Phoenix.Channel do and broadcasting the message to all topic subscribers for this socket. def handle_in("new:msg", %{"uid" => uid, "body" => body}, socket) do - broadcast socket, "new:msg", %{uid: uid, body: body} + broadcast! socket, "new:msg", %{uid: uid, body: body} {:ok, socket} end @@ -75,10 +75,10 @@ defmodule Phoenix.Channel do subscribers' `handle_out/3` callback is triggered where the event can be relayed as is, or customized on a socket by socket basis to append extra information, or conditionally filter the message from being delivered. - *Note*: `broadcast/3` and `reply/3` both return `{:ok, socket}`. + *Note*: `broadcast/3`, `broadcast!/3` and `reply/3` both return `{:ok, socket}`. def handle_in("new:msg", %{"uid" => uid, "body" => body}, socket) do - broadcast socket, "new:msg", %{uid: uid, body: body} + broadcast! socket, "new:msg", %{uid: uid, body: body} end # for every socket subscribing on this topic, append an `is_editable` @@ -156,13 +156,22 @@ defmodule Phoenix.Channel do def broadcast_from(socket = %Socket{}, event, msg) do Phoenix.Channel.broadcast_from(@pubsub_server, socket, event, msg) end + def broadcast_from!(socket = %Socket{}, event, msg) do + Phoenix.Channel.broadcast_from!(@pubsub_server, socket, event, msg) + end def broadcast_from(from, topic, event, msg) when is_map(msg) do Phoenix.Channel.broadcast_from(@pubsub_server, from, topic, event, msg) end + def broadcast_from!(from, topic, event, msg) when is_map(msg) do + Phoenix.Channel.broadcast_from!(@pubsub_server, from, topic, event, msg) + end def broadcast(topic_or_socket, event, msg) do Phoenix.Channel.broadcast(@pubsub_server, topic_or_socket, event, msg) end + def broadcast!(topic_or_socket, event, msg) do + Phoenix.Channel.broadcast!(@pubsub_server, topic_or_socket, event, msg) + end defoverridable leave: 2, handle_out: 3 end @@ -182,12 +191,23 @@ defmodule Phoenix.Channel do def broadcast(server, topic, event, message) when is_binary(topic) do broadcast_from server, :none, topic, event, message end - def broadcast(server, socket = %Socket{}, event, message) do broadcast_from server, :none, socket.topic, event, message {:ok, socket} end + @doc """ + Same as `Phoenix.Channel.broadcast/4`, but + raises `Phoenix.PubSub.BroadcastError` if broadcast fails + """ + def broadcast!(server, topic, event, message) when is_binary(topic) do + broadcast_from! server, :none, topic, event, message + end + def broadcast!(server, socket = %Socket{}, event, message) do + broadcast_from! server, :none, socket.topic, event, message + {:ok, socket} + end + @doc """ Broadcast event from pid, serializable as JSON to channel The broadcasting socket `from`, does not receive the published message. @@ -212,6 +232,23 @@ defmodule Phoenix.Channel do end def broadcast_from(_, _, _, _, _), do: raise_invalid_message + @doc """ + Same as `Phoenix.Channel.broadcast_from/4`, but + raises `Phoenix.PubSub.BroadcastError` if broadcast fails + """ + def broadcast_from!(pubsub_server, socket = %Socket{}, event, message) do + broadcast_from!(pubsub_server, socket.pid, socket.topic, event, message) + {:ok, socket} + end + def broadcast_from!(pubsub_server, from, topic, event, message) when is_map(message) do + PubSub.broadcast_from! pubsub_server, from, topic, {:socket_broadcast, %Message{ + topic: topic, + event: event, + payload: message + }} + end + def broadcast_from!(_, _, _, _, _), do: raise_invalid_message + @doc """ Sends Dict, JSON serializable message to socket """ diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 52f838b67b..01b9ff4a3d 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -1,10 +1,37 @@ defmodule Phoenix.PubSub do - import GenServer, only: [call: 2] @moduledoc """ Serves as a Notification and PubSub layer for broad use-cases. Used internally by Channels for pubsub broadcast. + ## PubSub Adapter Contract + PubSub adapters need to only respond to a few process-based messages to + integrate with Phoenix. PubSub functions send the following messages: + + * `subscribe` - + sends: `{:subscribe, pid, topic, link}` + respond with: `:ok | {:error, reason} {:perform, {m, f a}}` + + * `unsubscribe` - + sends: `{:unsubscribe, pid, topic}` + respond with: `:ok | {:error, reason} {:perform, {m, f a}}` + + * `subscribers` - + respond with: `:ok | {:error, reason} {:perform, {m, f a}}` + + * `broadcast` - + sends `{:broadcast, :none, topic, message}` + respond with: `:ok | {:error, reason} {:perform, {m, f a}}` + + + ### Offloading work to clients via MFA response + The `Phoenix.PubSub` API allows any of its functions to handle a + response from the adapter matching `{:perform, {m, f a}}`. The PubSub + client will recursively invoke all MFA responses until a result is + return. This is useful for offloading work to clients without blocking + in your PubSub adapter. See `Phoenix.PubSub.PG2` for an example usage. + + ## Example iex> PubSub.subscribe(self, "user:123") @@ -20,6 +47,13 @@ defmodule Phoenix.PubSub do """ + defmodule BroadcastError do + defexception [:message] + def exception(msg) do + %BroadcastError{message: "Broadcast failed with #{inspect msg}"} + end + end + @doc """ Subscribes the pid to the pg2 group for the topic @@ -55,15 +89,46 @@ defmodule Phoenix.PubSub do def broadcast(server, topic, message), do: call(server, {:broadcast, :none, topic, message}) + @doc """ + Broadcasts message on given topic + raises `Phoenix.PubSub.BroadcastError` if broadcast fails + """ + def broadcast!(server, topic, message, broadcaster \\ __MODULE__) do + case broadcaster.broadcast(server, topic, message) do + :ok -> :ok + {:error, reason} -> raise BroadcastError, message: reason + end + end + @doc """ Broadcasts message to all but sender on given topic """ def broadcast_from(server, from_pid, topic, message), do: call(server, {:broadcast, from_pid, topic, message}) + @doc """ + Broadcasts message to all but sender on given topic + raises `Phoenix.PubSub.BroadcastError` if broadcast fails + """ + def broadcast_from!(server, from_pid, topic, message, broadcaster \\ __MODULE__) do + case broadcaster.broadcast_from(server, from_pid, topic, message) do + :ok -> :ok + {:error, reason} -> raise BroadcastError, message: reason + end + end + @doc false # Returns list of all topics under local server, for debug and perf tuning def list(server_name) do GenServer.call(Module.concat(server_name, Local), :list) end + + defp call(server, msg) do + GenServer.call(server, msg) |> perform + end + + defp perform({:perform, {mod, func, args}}) do + apply(mod, func, args) |> perform + end + defp perform(result), do: result end diff --git a/lib/phoenix/pubsub/pg2_server.ex b/lib/phoenix/pubsub/pg2_server.ex index 782927d0a6..fd5c602e61 100644 --- a/lib/phoenix/pubsub/pg2_server.ex +++ b/lib/phoenix/pubsub/pg2_server.ex @@ -23,24 +23,29 @@ defmodule Phoenix.PubSub.PG2Server do end def handle_call({:subscribe, pid, topic, link}, _from, state) do - {:reply, GenServer.call(state.local_name, {:subscribe, pid, topic, link}), state} + response = {:perform, {GenServer, :call, [state.local_name, {:subscribe, pid, topic, link}]}} + {:reply, response, state} end def handle_call({:unsubscribe, pid, topic}, _from, state) do - {:reply, GenServer.call(state.local_name, {:unsubscribe, pid, topic}), state} + response = {:perform, {GenServer, :call, [state.local_name, {:unsubscribe, pid, topic}]}} + {:reply, response, state} end def handle_call({:broadcast, from_pid, topic, msg}, _from, state) do case :pg2.get_members(state.namespace) do + {:error, {:no_such_group, _}} -> + {:stop, :no_such_group, {:error, :no_such_group}, state} + pids when is_list(pids) -> Enum.each(pids, &send(&1, {:forward_to_local, from_pid, topic, msg})) + {:reply, :ok, state} end - - {:reply, :ok, state} end def handle_call({:subscribers, topic}, _from, state) do - {:reply, GenServer.call(state.local_name, {:subscribers, topic}), state} + response = {:perform, {GenServer, :call, [state.local_name, {:subscribers, topic}]}} + {:reply, response, state} end def handle_call(:stop, _from, state) do diff --git a/lib/phoenix/pubsub/redis_server.ex b/lib/phoenix/pubsub/redis_server.ex index c20e3e50a4..da898cd67b 100644 --- a/lib/phoenix/pubsub/redis_server.ex +++ b/lib/phoenix/pubsub/redis_server.ex @@ -21,6 +21,17 @@ defmodule Phoenix.PubSub.RedisServer do GenServer.start_link(__MODULE__, opts, name: Dict.fetch!(opts, :name)) end + @doc """ + Broadcasts message to redis. To be only called from {:perform, {m, f, a}} + response to clients + """ + def broadcast(eredis_pid, namespace, redis_msg) do + case :eredis.q(eredis_pid, ["PUBLISH", namespace, redis_msg]) do + {:ok, _} -> :ok + {:error, reason} -> {:error, reason} + end + end + @doc """ Initializes the server. @@ -49,23 +60,24 @@ defmodule Phoenix.PubSub.RedisServer do end def handle_call({:subscribe, pid, topic, link}, _from, state) do - {:reply, GenServer.call(state.local_name, {:subscribe, pid, topic, link}), state} + response = {:perform, {GenServer, :call, [state.local_name, {:subscribe, pid, topic, link}]}} + {:reply, response, state} end def handle_call({:unsubscribe, pid, topic}, _from, state) do - {:reply, GenServer.call(state.local_name, {:unsubscribe, pid, topic}), state} + response = {:perform, {GenServer, :call, [state.local_name, {:unsubscribe, pid, topic}]}} + {:reply, response, state} end def handle_call({:subscribers, topic}, _from, state) do - {:reply, GenServer.call(state.local_name, {:subscribers, topic}), state} + response = {:perform, {GenServer, :call, [state.local_name, {:subscribers, topic}]}} + {:reply, response, state} end def handle_call({:broadcast, from_pid, topic, msg}, _from, state) do redis_msg = {@redis_msg_vsn, state.node_ref, from_pid, topic, msg} - case :eredis.q(state.eredis_pid, ["PUBLISH", state.namespace, redis_msg]) do - {:ok, _} -> {:reply, :ok, state} - {:error, reason} -> {:reply, {:error, reason}, state} - end + resp = {:perform, {__MODULE__, :broadcast, [state.eredis_pid, state.namespace, redis_msg]}} + {:reply, resp, state} end @doc """ diff --git a/test/phoenix/channel_test.exs b/test/phoenix/channel_test.exs index b0afbb26d2..e0ce5148d0 100644 --- a/test/phoenix/channel_test.exs +++ b/test/phoenix/channel_test.exs @@ -110,40 +110,54 @@ defmodule Phoenix.ChannelTest do assert PubSub.subscribers(:my_app_pub, "top:subtop") == HashSet.new end - test "#broadcast broadcasts global message on topic" do + test "#broadcast and #broadcast! broadcasts global message on topic" do socket = Socket.put_topic(new_socket, "top:subtop") assert Channel.broadcast(:my_app_pub, socket, "event", %{foo: "bar"}) + assert Channel.broadcast!(:my_app_pub, socket, "event", %{foo: "bar"}) end - test "#broadcast raises friendly error when message arg isn't a Map" do + + test "#broadcast and #broadcast! raises friendly error when message arg isn't a Map" do message = "Message argument must be a map" assert_raise RuntimeError, message, fn -> Channel.broadcast(:my_app_pub, "topic:subtopic", "event", bar: "foo", foo: "bar") end + assert_raise RuntimeError, message, fn -> + Channel.broadcast!(:my_app_pub, "topic:subtopic", "event", bar: "foo", foo: "bar") + end end - test "#broadcast_from broadcasts message on topic, skipping publisher" do + test "#broadcast_from and #broadcast_from! broadcasts message, skipping publisher" do socket = new_socket |> Socket.put_topic("top:subtop") PubSub.subscribe(:my_app_pub, socket.pid, "top:subtop") assert Channel.broadcast_from(:my_app_pub, socket, "event", %{payload: "hello"}) refute Enum.any?(Process.info(self)[:messages], &match?(%Message{}, &1)) + + assert Channel.broadcast_from!(:my_app_pub, socket, "event", %{payload: "hello"}) + refute Enum.any?(Process.info(self)[:messages], &match?(%Message{}, &1)) end - test "#broadcast_from raises friendly error when message arg isn't a Map" do + test "#broadcast_from and #broadcast_from! raises error when msg isn't a Map" do socket = Socket.put_topic(new_socket, "top:subtop") message = "Message argument must be a map" assert_raise RuntimeError, message, fn -> Channel.broadcast_from(:my_app_pub, socket, "event", bar: "foo", foo: "bar") end + assert_raise RuntimeError, message, fn -> + Channel.broadcast_from!(:my_app_pub, socket, "event", bar: "foo", foo: "bar") + end end - test "#broadcast_from/4 raises friendly error when message arg isn't a Map" do + test "#broadcast_from/4 and broadcast_from!/4 raises error when msg isn't a Map" do message = "Message argument must be a map" assert_raise RuntimeError, message, fn -> Channel.broadcast_from(:my_app_pub, self, "topic:subtopic", "event", bar: "foo") end + assert_raise RuntimeError, message, fn -> + Channel.broadcast_from!(:my_app_pub, self, "topic:subtopic", "event", bar: "foo") + end end test "#reply sends response to socket" do diff --git a/test/phoenix/integration/channel_test.exs b/test/phoenix/integration/channel_test.exs index 428cf85a93..02a9758bc4 100644 --- a/test/phoenix/integration/channel_test.exs +++ b/test/phoenix/integration/channel_test.exs @@ -36,7 +36,7 @@ defmodule Phoenix.Integration.ChannelTest do end def handle_in("new:msg", message, socket) do - broadcast socket, "new:msg", message + broadcast! socket, "new:msg", message end end @@ -149,8 +149,8 @@ defmodule Phoenix.Integration.ChannelTest do assert resp.status == 204 # messages are buffered between polls - Phoenix.Channel.broadcast :my_app_pub, "rooms:lobby", "user:entered", %{name: "José"} - Phoenix.Channel.broadcast :my_app_pub, "rooms:lobby", "user:entered", %{name: "Sonny"} + Phoenix.Channel.broadcast! :my_app_pub, "rooms:lobby", "user:entered", %{name: "José"} + Phoenix.Channel.broadcast! :my_app_pub, "rooms:lobby", "user:entered", %{name: "Sonny"} {resp, cookie} = poll(:get, "/ws/poll", cookie) assert resp.status == 200 assert Enum.count(resp.body) == 2 @@ -190,7 +190,7 @@ defmodule Phoenix.Integration.ChannelTest do "event" => "join", "payload" => %{}} assert resp.status == 200 - Phoenix.Channel.broadcast :my_app_pub, "rooms:lobby", "new:msg", %{body: "Hello lobby"} + Phoenix.Channel.broadcast! :my_app_pub, "rooms:lobby", "new:msg", %{body: "Hello lobby"} # poll {resp, cookie} = poll(:get, "/ws/poll", cookie) assert resp.status == 200 diff --git a/test/phoenix/pubsub/pubsub_test.exs b/test/phoenix/pubsub/pubsub_test.exs index bc62f2c81c..2d4db28289 100644 --- a/test/phoenix/pubsub/pubsub_test.exs +++ b/test/phoenix/pubsub/pubsub_test.exs @@ -12,6 +12,11 @@ defmodule Phoenix.PubSub.PubSubTest do spawn fn -> :timer.sleep(:infinity) end end + defmodule FailedBroadcaster do + def broadcast(_server, _topic, _msg), do: {:error, :boom} + def broadcast_from(_server, _from_pid, _topic, _msg), do: {:error, :boom} + end + for {adapter, name} <- @adapters do @adapter adapter @server name @@ -63,25 +68,42 @@ defmodule Phoenix.PubSub.PubSubTest do assert Process.alive?(non_linked_pid2) end - test "#{inspect @adapter} #broadcast publishes message to each subscriber" do + test "#{inspect @adapter} broadcast/3 and broadcast!/3 publishes message to each subscriber" do PubSub.subscribe(@server, self, "topic9") assert PubSub.subscribers(@server, "topic9") |> Enum.to_list == [self] - PubSub.broadcast(@server, "topic9", :ping) + :ok = PubSub.broadcast(@server, "topic9", :ping) + assert_receive :ping + :ok = PubSub.broadcast!(@server, "topic9", :ping) assert_receive :ping end - test "#{inspect @adapter} #broadcast does not publish message to other topic subscribers" do + test "#{inspect @adapter} broadcast!/3 and broadcast_from!/4 raise if broadcast fails" do + PubSub.subscribe(@server, self, "topic9") + assert PubSub.subscribers(@server, "topic9") |> Enum.to_list == [self] + assert_raise PubSub.BroadcastError, fn -> + PubSub.broadcast!(@server, "topic9", :ping, FailedBroadcaster) + end + assert_raise PubSub.BroadcastError, fn -> + PubSub.broadcast_from!(@server, self, "topic9", :ping, FailedBroadcaster) + end + refute_receive :ping + end + + test "#{inspect @adapter} broadcast/3 does not publish message to other topic subscribers" do pids = Enum.map 0..10, fn _ -> spawn_pid end pids |> Enum.each(&PubSub.subscribe(@server, &1, "topic10")) - PubSub.broadcast(@server, "topic10", :ping) + :ok = PubSub.broadcast(@server, "topic10", :ping) refute_receive :ping pids |> Enum.each(&Process.exit &1, :kill) end - test "#{inspect @adapter} #broadcast_from does not publish to broadcaster pid when provided" do + test "#{inspect @adapter} broadcast_from/4 and broadcast_from!/4 skips sender" do PubSub.subscribe(@server, self, "topic11") PubSub.broadcast_from(@server, self, "topic11", :ping) refute_receive :ping + + PubSub.broadcast_from!(@server, self, "topic11", :ping) + refute_receive :ping end test "#{inspect @adapter} processes automatically removed from topic when killed" do