Skip to content

Commit

Permalink
Add client offloading and broadcast!
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismccord committed Jan 31, 2015
1 parent cf7c6ea commit 7ff363d
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 32 deletions.
47 changes: 42 additions & 5 deletions lib/phoenix/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ defmodule Phoenix.Channel do
After a client has successfully joined a channel, incoming events from the
client are routed through the channel's `handle_in/3` callbacks. Within these
callbacks, you can perform any action. Typically you'll either foward a
message out to all listeners with `Phoenix.Channel.broadcast/3`, or reply
message out to all listeners with `Phoenix.Channel.broadcast!/3`, or reply
directly to the socket with `Phoenix.Channel.reply/3`.
Incoming callbacks must return the `socket` to maintain ephemeral state.
Here's an example of receiving an incoming `"new:msg"` event from a one client,
and broadcasting the message to all topic subscribers for this socket.
def handle_in("new:msg", %{"uid" => uid, "body" => body}, socket) do
broadcast socket, "new:msg", %{uid: uid, body: body}
broadcast! socket, "new:msg", %{uid: uid, body: body}
{:ok, socket}
end
Expand All @@ -75,10 +75,10 @@ defmodule Phoenix.Channel do
subscribers' `handle_out/3` callback is triggered where the event can be
relayed as is, or customized on a socket by socket basis to append extra
information, or conditionally filter the message from being delivered.
*Note*: `broadcast/3` and `reply/3` both return `{:ok, socket}`.
*Note*: `broadcast/3`, `broadcast!/3` and `reply/3` both return `{:ok, socket}`.
def handle_in("new:msg", %{"uid" => uid, "body" => body}, socket) do
broadcast socket, "new:msg", %{uid: uid, body: body}
broadcast! socket, "new:msg", %{uid: uid, body: body}
end
# for every socket subscribing on this topic, append an `is_editable`
Expand Down Expand Up @@ -156,13 +156,22 @@ defmodule Phoenix.Channel do
def broadcast_from(socket = %Socket{}, event, msg) do
Phoenix.Channel.broadcast_from(@pubsub_server, socket, event, msg)
end
def broadcast_from!(socket = %Socket{}, event, msg) do
Phoenix.Channel.broadcast_from!(@pubsub_server, socket, event, msg)
end
def broadcast_from(from, topic, event, msg) when is_map(msg) do
Phoenix.Channel.broadcast_from(@pubsub_server, from, topic, event, msg)
end
def broadcast_from!(from, topic, event, msg) when is_map(msg) do
Phoenix.Channel.broadcast_from!(@pubsub_server, from, topic, event, msg)
end

def broadcast(topic_or_socket, event, msg) do
Phoenix.Channel.broadcast(@pubsub_server, topic_or_socket, event, msg)
end
def broadcast!(topic_or_socket, event, msg) do
Phoenix.Channel.broadcast!(@pubsub_server, topic_or_socket, event, msg)
end

defoverridable leave: 2, handle_out: 3
end
Expand All @@ -182,12 +191,23 @@ defmodule Phoenix.Channel do
def broadcast(server, topic, event, message) when is_binary(topic) do
broadcast_from server, :none, topic, event, message
end

def broadcast(server, socket = %Socket{}, event, message) do
broadcast_from server, :none, socket.topic, event, message
{:ok, socket}
end

@doc """
Same as `Phoenix.Channel.broadcast/4`, but
raises `Phoenix.PubSub.BroadcastError` if broadcast fails
"""
def broadcast!(server, topic, event, message) when is_binary(topic) do
broadcast_from! server, :none, topic, event, message
end
def broadcast!(server, socket = %Socket{}, event, message) do
broadcast_from! server, :none, socket.topic, event, message
{:ok, socket}
end

@doc """
Broadcast event from pid, serializable as JSON to channel
The broadcasting socket `from`, does not receive the published message.
Expand All @@ -212,6 +232,23 @@ defmodule Phoenix.Channel do
end
def broadcast_from(_, _, _, _, _), do: raise_invalid_message

@doc """
Same as `Phoenix.Channel.broadcast_from/4`, but
raises `Phoenix.PubSub.BroadcastError` if broadcast fails
"""
def broadcast_from!(pubsub_server, socket = %Socket{}, event, message) do
broadcast_from!(pubsub_server, socket.pid, socket.topic, event, message)
{:ok, socket}
end
def broadcast_from!(pubsub_server, from, topic, event, message) when is_map(message) do
PubSub.broadcast_from! pubsub_server, from, topic, {:socket_broadcast, %Message{
topic: topic,
event: event,
payload: message
}}
end
def broadcast_from!(_, _, _, _, _), do: raise_invalid_message

@doc """
Sends Dict, JSON serializable message to socket
"""
Expand Down
67 changes: 66 additions & 1 deletion lib/phoenix/pubsub.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
defmodule Phoenix.PubSub do
import GenServer, only: [call: 2]

@moduledoc """
Serves as a Notification and PubSub layer for broad use-cases. Used internally
by Channels for pubsub broadcast.
## PubSub Adapter Contract
PubSub adapters need to only respond to a few process-based messages to
integrate with Phoenix. PubSub functions send the following messages:
* `subscribe` -
sends: `{:subscribe, pid, topic, link}`
respond with: `:ok | {:error, reason} {:perform, {m, f a}}`
* `unsubscribe` -
sends: `{:unsubscribe, pid, topic}`
respond with: `:ok | {:error, reason} {:perform, {m, f a}}`
* `subscribers` -
respond with: `:ok | {:error, reason} {:perform, {m, f a}}`
* `broadcast` -
sends `{:broadcast, :none, topic, message}`
respond with: `:ok | {:error, reason} {:perform, {m, f a}}`
### Offloading work to clients via MFA response
The `Phoenix.PubSub` API allows any of its functions to handle a
response from the adapter matching `{:perform, {m, f a}}`. The PubSub
client will recursively invoke all MFA responses until a result is
return. This is useful for offloading work to clients without blocking
in your PubSub adapter. See `Phoenix.PubSub.PG2` for an example usage.
## Example
iex> PubSub.subscribe(self, "user:123")
Expand All @@ -20,6 +47,13 @@ defmodule Phoenix.PubSub do
"""

defmodule BroadcastError do
defexception [:message]
def exception(msg) do
%BroadcastError{message: "Broadcast failed with #{inspect msg}"}
end
end


@doc """
Subscribes the pid to the pg2 group for the topic
Expand Down Expand Up @@ -55,15 +89,46 @@ defmodule Phoenix.PubSub do
def broadcast(server, topic, message),
do: call(server, {:broadcast, :none, topic, message})

@doc """
Broadcasts message on given topic
raises `Phoenix.PubSub.BroadcastError` if broadcast fails
"""
def broadcast!(server, topic, message, broadcaster \\ __MODULE__) do
case broadcaster.broadcast(server, topic, message) do
:ok -> :ok
{:error, reason} -> raise BroadcastError, message: reason
end
end

@doc """
Broadcasts message to all but sender on given topic
"""
def broadcast_from(server, from_pid, topic, message),
do: call(server, {:broadcast, from_pid, topic, message})

@doc """
Broadcasts message to all but sender on given topic
raises `Phoenix.PubSub.BroadcastError` if broadcast fails
"""
def broadcast_from!(server, from_pid, topic, message, broadcaster \\ __MODULE__) do
case broadcaster.broadcast_from(server, from_pid, topic, message) do
:ok -> :ok
{:error, reason} -> raise BroadcastError, message: reason
end
end

@doc false
# Returns list of all topics under local server, for debug and perf tuning
def list(server_name) do
GenServer.call(Module.concat(server_name, Local), :list)
end

defp call(server, msg) do
GenServer.call(server, msg) |> perform
end

defp perform({:perform, {mod, func, args}}) do
apply(mod, func, args) |> perform
end
defp perform(result), do: result
end
15 changes: 10 additions & 5 deletions lib/phoenix/pubsub/pg2_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,29 @@ defmodule Phoenix.PubSub.PG2Server do
end

def handle_call({:subscribe, pid, topic, link}, _from, state) do
{:reply, GenServer.call(state.local_name, {:subscribe, pid, topic, link}), state}
response = {:perform, {GenServer, :call, [state.local_name, {:subscribe, pid, topic, link}]}}
{:reply, response, state}
end

def handle_call({:unsubscribe, pid, topic}, _from, state) do
{:reply, GenServer.call(state.local_name, {:unsubscribe, pid, topic}), state}
response = {:perform, {GenServer, :call, [state.local_name, {:unsubscribe, pid, topic}]}}
{:reply, response, state}
end

def handle_call({:broadcast, from_pid, topic, msg}, _from, state) do
case :pg2.get_members(state.namespace) do
{:error, {:no_such_group, _}} ->
{:stop, :no_such_group, {:error, :no_such_group}, state}

pids when is_list(pids) ->
Enum.each(pids, &send(&1, {:forward_to_local, from_pid, topic, msg}))
{:reply, :ok, state}
end

{:reply, :ok, state}
end

def handle_call({:subscribers, topic}, _from, state) do
{:reply, GenServer.call(state.local_name, {:subscribers, topic}), state}
response = {:perform, {GenServer, :call, [state.local_name, {:subscribers, topic}]}}
{:reply, response, state}
end

def handle_call(:stop, _from, state) do
Expand Down
26 changes: 19 additions & 7 deletions lib/phoenix/pubsub/redis_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ defmodule Phoenix.PubSub.RedisServer do
GenServer.start_link(__MODULE__, opts, name: Dict.fetch!(opts, :name))
end

@doc """
Broadcasts message to redis. To be only called from {:perform, {m, f, a}}
response to clients
"""
def broadcast(eredis_pid, namespace, redis_msg) do
case :eredis.q(eredis_pid, ["PUBLISH", namespace, redis_msg]) do
{:ok, _} -> :ok
{:error, reason} -> {:error, reason}
end
end

@doc """
Initializes the server.
Expand Down Expand Up @@ -49,23 +60,24 @@ defmodule Phoenix.PubSub.RedisServer do
end

def handle_call({:subscribe, pid, topic, link}, _from, state) do
{:reply, GenServer.call(state.local_name, {:subscribe, pid, topic, link}), state}
response = {:perform, {GenServer, :call, [state.local_name, {:subscribe, pid, topic, link}]}}
{:reply, response, state}
end

def handle_call({:unsubscribe, pid, topic}, _from, state) do
{:reply, GenServer.call(state.local_name, {:unsubscribe, pid, topic}), state}
response = {:perform, {GenServer, :call, [state.local_name, {:unsubscribe, pid, topic}]}}
{:reply, response, state}
end

def handle_call({:subscribers, topic}, _from, state) do
{:reply, GenServer.call(state.local_name, {:subscribers, topic}), state}
response = {:perform, {GenServer, :call, [state.local_name, {:subscribers, topic}]}}
{:reply, response, state}
end

def handle_call({:broadcast, from_pid, topic, msg}, _from, state) do
redis_msg = {@redis_msg_vsn, state.node_ref, from_pid, topic, msg}
case :eredis.q(state.eredis_pid, ["PUBLISH", state.namespace, redis_msg]) do
{:ok, _} -> {:reply, :ok, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
resp = {:perform, {__MODULE__, :broadcast, [state.eredis_pid, state.namespace, redis_msg]}}
{:reply, resp, state}
end

@doc """
Expand Down
24 changes: 19 additions & 5 deletions test/phoenix/channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -110,40 +110,54 @@ defmodule Phoenix.ChannelTest do
assert PubSub.subscribers(:my_app_pub, "top:subtop") == HashSet.new
end

test "#broadcast broadcasts global message on topic" do
test "#broadcast and #broadcast! broadcasts global message on topic" do
socket = Socket.put_topic(new_socket, "top:subtop")

assert Channel.broadcast(:my_app_pub, socket, "event", %{foo: "bar"})
assert Channel.broadcast!(:my_app_pub, socket, "event", %{foo: "bar"})
end

test "#broadcast raises friendly error when message arg isn't a Map" do

test "#broadcast and #broadcast! raises friendly error when message arg isn't a Map" do
message = "Message argument must be a map"
assert_raise RuntimeError, message, fn ->
Channel.broadcast(:my_app_pub, "topic:subtopic", "event", bar: "foo", foo: "bar")
end
assert_raise RuntimeError, message, fn ->
Channel.broadcast!(:my_app_pub, "topic:subtopic", "event", bar: "foo", foo: "bar")
end
end

test "#broadcast_from broadcasts message on topic, skipping publisher" do
test "#broadcast_from and #broadcast_from! broadcasts message, skipping publisher" do
socket = new_socket |> Socket.put_topic("top:subtop")
PubSub.subscribe(:my_app_pub, socket.pid, "top:subtop")

assert Channel.broadcast_from(:my_app_pub, socket, "event", %{payload: "hello"})
refute Enum.any?(Process.info(self)[:messages], &match?(%Message{}, &1))

assert Channel.broadcast_from!(:my_app_pub, socket, "event", %{payload: "hello"})
refute Enum.any?(Process.info(self)[:messages], &match?(%Message{}, &1))
end

test "#broadcast_from raises friendly error when message arg isn't a Map" do
test "#broadcast_from and #broadcast_from! raises error when msg isn't a Map" do
socket = Socket.put_topic(new_socket, "top:subtop")
message = "Message argument must be a map"
assert_raise RuntimeError, message, fn ->
Channel.broadcast_from(:my_app_pub, socket, "event", bar: "foo", foo: "bar")
end
assert_raise RuntimeError, message, fn ->
Channel.broadcast_from!(:my_app_pub, socket, "event", bar: "foo", foo: "bar")
end
end

test "#broadcast_from/4 raises friendly error when message arg isn't a Map" do
test "#broadcast_from/4 and broadcast_from!/4 raises error when msg isn't a Map" do
message = "Message argument must be a map"
assert_raise RuntimeError, message, fn ->
Channel.broadcast_from(:my_app_pub, self, "topic:subtopic", "event", bar: "foo")
end
assert_raise RuntimeError, message, fn ->
Channel.broadcast_from!(:my_app_pub, self, "topic:subtopic", "event", bar: "foo")
end
end

test "#reply sends response to socket" do
Expand Down
8 changes: 4 additions & 4 deletions test/phoenix/integration/channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Phoenix.Integration.ChannelTest do
end

def handle_in("new:msg", message, socket) do
broadcast socket, "new:msg", message
broadcast! socket, "new:msg", message
end
end

Expand Down Expand Up @@ -149,8 +149,8 @@ defmodule Phoenix.Integration.ChannelTest do
assert resp.status == 204

# messages are buffered between polls
Phoenix.Channel.broadcast :my_app_pub, "rooms:lobby", "user:entered", %{name: "José"}
Phoenix.Channel.broadcast :my_app_pub, "rooms:lobby", "user:entered", %{name: "Sonny"}
Phoenix.Channel.broadcast! :my_app_pub, "rooms:lobby", "user:entered", %{name: "José"}
Phoenix.Channel.broadcast! :my_app_pub, "rooms:lobby", "user:entered", %{name: "Sonny"}
{resp, cookie} = poll(:get, "/ws/poll", cookie)
assert resp.status == 200
assert Enum.count(resp.body) == 2
Expand Down Expand Up @@ -190,7 +190,7 @@ defmodule Phoenix.Integration.ChannelTest do
"event" => "join",
"payload" => %{}}
assert resp.status == 200
Phoenix.Channel.broadcast :my_app_pub, "rooms:lobby", "new:msg", %{body: "Hello lobby"}
Phoenix.Channel.broadcast! :my_app_pub, "rooms:lobby", "new:msg", %{body: "Hello lobby"}
# poll
{resp, cookie} = poll(:get, "/ws/poll", cookie)
assert resp.status == 200
Expand Down
Loading

0 comments on commit 7ff363d

Please sign in to comment.