Mix.install([
{:jason, "~> 1.4"},
{:kino, "~> 0.9", override: true},
{:youtube, github: "brooklinjazz/youtube"},
{:hidden_cell, github: "brooklinjazz/hidden_cell"}
])
Upon completing this lesson, a student should be able to answer the following questions.
- What is the PubSub (Publisher/Subscriber pattern) and how does it enable real-time features?
- How do we subscribe a LiveView process to a topic?
- How do we broadcast to a topic?
- How do we handle a broadcasted message in a LiveView process?
PubSub, or "Publish-Subscribe", is a messaging pattern that allows senders of messages (publishers) to send messages to multiple receivers (subscribers) without explicitly establishing a connection to each individual receiver. This allows for a decoupled communication model, where the publisher and subscriber do not need to be aware of each other or directly connected in order to communicate.
flowchart
PS[PubSub]
P[Publisher]
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]
P --broadcast--> PS
PS --broadcast--> S1
PS --broadcast--> S2
PS --broadcast--> S3
In a PubSub system, a publisher sends a message to a topic, which acts as a logical channel for the message. Subscribers can then subscribe to that topic, and will receive a copy of the message when it is published. This allows multiple subscribers to receive the same message, and also allows publishers to send messages to multiple topics, which can then be received by multiple subscribers.
flowchart BT
S1[Subscriber]
S2[Subscriber]
S3[Subscriber]
T[Topic]
S1 --subscribe--> T
S2 --subscribe--> T
S3 --subscribe--> T
Phoenix provides a built-in PubSub system based on the Elixir process-based actors model, which allows clients to subscribe to topics and receive messages in real-time. Since LiveViews are GenServer processes, each Phoenix LiveView can subscribe to relevant topics and render information in real-time based on published events.
Our Phoenix PubSub service is started in application.ex
as part of the application supervision tree.
def start(_type, _args) do
children = [
# Start the Ecto repository
App.Repo,
# Start the Telemetry supervisor
AppWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: App.PubSub},
# Start the Endpoint (http/https)
AppWeb.Endpoint
# Start a worker by calling: App.Worker.start_link(arg)
# {App.Worker, arg}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: App.Supervisor]
Supervisor.start_link(children, opts)
end
Over the next several lessons, we're going to build a PicChat
application where users can create messages with uploaded pictures. This lesson will focus adding a pub-sub system that enables a real-time feed of messages.
Whenever a LiveView process triggers a relevant event such as adding, creating, or updating a message, we'll broadcast a message in the pubsub that causes other LiveView processes to update their chat feed.
flowchart
P[Publisher]
PS[(PubSub)]
S1[LiveView Subscriber]
S2[LiveView Subscriber]
S3[LiveView Subscriber]
P --broadcast event to ''messages'' topic--> PS
PS --broadcast event--> S1
PS --broadcast event--> S2
PS --broadcast event--> S3
There are three main steps to implement a simple PubSub system.
- Use Endpoint.subscribe/2 which calls Phoenix.PubSuber.subscribe/3 to subscribe the relevant processes to a topic.
- Broadcast a message to all subscribers of a topic using a broadcast function such as Endpoint.broadcast/3 or Endpoint.broadcast_from/4 (does not send a message to the caller)
- Handle the broadcasted message in the subscriber processes with a handle_info/2 callback.
We display the list of messages on the MessageLive.Index
LiveView. We're going to subscribe every mounted MessageLive.Index
LiveView to a "messages"
topic.
# Index.ex
def mount(_params, session, socket) do
if connected?(socket) do
PicChatWeb.Endpoint.subscribe("messages")
end
{:ok, stream(socket, :messages, Chat.list_messages())}
end
Now our subscribed MessageLive.Index
LiveView processes will receive any messages broadcast to the "messages"
topic.
We want to broadcast messages to our PubSub system whenever an event occurs on the "messages"
topic that is relevant to providing real-time updates. We'll broadcast a message anytime we create, update, or delete a Message
record.
Broadcast a "new"
or "edit"
event on the "messages"
topic whenever we create or update a new message.
To save time so that the broadcasted message doesn't have to go through the PubSub system, we've opted to use broadcast_from/4
instead of broadcast/3
and let the sender notify the parent LiveView of the saved message directly.
# Form_component.ex
defp save_message(socket, :edit, message_params) do
case Chat.update_message(socket.assigns.message, message_params) do
{:ok, message} ->
notify_parent({:edit, message})
PicChatWeb.Endpoint.broadcast_from(self(), "messages", "edit", message)
{:noreply,
socket
|> put_flash(:info, "Message updated successfully")
|> push_patch(to: socket.assigns.patch)}
{:error, %Ecto.Changeset{} = changeset} ->
{:noreply, assign_form(socket, changeset)}
end
end
defp save_message(socket, :new, message_params) do
case Chat.create_message(message_params) do
{:ok, message} ->
notify_parent({:new, message})
PicChatWeb.Endpoint.broadcast_from(self(), "messages", "new", message)
{:noreply,
socket
|> put_flash(:info, "Message created successfully")
|> push_patch(to: socket.assigns.patch)}
{:error, %Ecto.Changeset{} = changeset} ->
{:noreply, assign_form(socket, changeset)}
end
end
Broadcast a "delete"
event on the "messages"
topic when we create or update a new message.
# Index.ex
def handle_event("delete", %{"id" => id}, socket) do
message = Chat.get_message!(id)
if message.user_id == socket.assigns.current_user.id do
{:ok, _} = Chat.delete_message(message)
PicChatWeb.Endpoint.broadcast_from(self(), "messages", "delete", message)
{:noreply, stream_delete(socket, :messages, message)}
else
{:noreply,
Phoenix.LiveView.put_flash(
socket,
:error,
"You are not authorized to delete this message."
)}
end
end
The handle_info/2
callback in subscribed LiveView processes will receive a Phoenix.Socket.Broadcast broadcast struct.
%Phoenix.Socket.Broadcast{
topic: "messages",
event: "new",
payload: %PicChat.Chat.Message{
__meta__: #Ecto.Schema.Metadata<:loaded, "messages">,
id: 4,
content: "some content",
picture: nil,
user_id: 1,
user: #Ecto.Association.NotLoaded<association :user is not loaded>,
inserted_at: ~N[2023-05-28 21:26:18],
updated_at: ~N[2023-05-28 21:26:18]
}
}
We can pattern match on this struct to create event handlers. Add handle_info/2
callback functions for creating, updating, and deleting a Message
in the stream of messages.
# Index.ex
def handle_info(%Phoenix.Socket.Broadcast{topic: "messages", event: "new", payload: message}, socket) do
{:noreply, stream_insert(socket, :messages, message, at: 0)}
end
def handle_info(%Phoenix.Socket.Broadcast{topic: "messages", event: "edit", payload: message}, socket) do
{:noreply, stream_insert(socket, :messages, message)}
end
def handle_info(%Phoenix.Socket.Broadcast{topic: "messages", event: "delete", payload: message}, socket) do
{:noreply, stream_delete(socket, :messages, message)}
end
We can test PubSub interactions by mounting multiple LiveViews and triggering events.
Here are some tests we can add to message_live_test.ex
to ensure the create, update, and delete PubSub functionality works as expected.
describe "PubSub" do
test "creating a message updates subscribers", %{conn: conn} do
user = user_fixture()
conn = log_in_user(conn, user)
{:ok, subscriber_live, _html} = live(conn, ~p"/messages")
{:ok, publisher_live, _html} = live(conn, ~p"/messages/new")
assert publisher_live
|> form("#message-form", message: @create_attrs)
|> render_submit()
assert render(subscriber_live) =~ "some content"
end
test "updating a message updates subscribers", %{conn: conn} do
user = user_fixture()
conn = log_in_user(conn, user)
message = message_fixture(user_id: user.id)
{:ok, subscriber_live, _html} = live(conn, ~p"/messages")
{:ok, publisher_live, _html} = live(conn, ~p"/messages/#{message}/edit")
assert publisher_live
|> form("#message-form", message: @update_attrs)
|> render_submit()
assert render(subscriber_live) =~ "some updated content"
end
test "deleting a message updates subscribers", %{conn: conn} do
user = user_fixture()
conn = log_in_user(conn, user)
message = message_fixture(user_id: user.id)
{:ok, subscriber_live, _html} = live(conn, ~p"/messages")
{:ok, publisher_live, _html} = live(conn, ~p"/messages/#{message}/edit")
assert publisher_live |> element("#messages-#{message.id} a", "Delete") |> render_click()
refute render(subscriber_live) =~ "some content"
end
end
Consider the following resource(s) to deepen your understanding of the topic.
- Elixir Schools: LiveView with PubSub
- Elixir Schools: LiveView with Channels
- HexDocs: Phoenix Channels
- HexDocs: Phoenix PubSub
- HexDocs: Phoenix Endpoint
DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.
Run git status
to ensure there are no undesirable changes.
Then run the following in your command line from the curriculum
folder to commit your progress.
$ git add .
$ git commit -m "finish PicChat: PubSub reading"
$ git push
We're proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.
We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.