Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RTC-456] Add peerless room purge #150

Merged
merged 8 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions lib/jellyfish/component/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ defmodule Jellyfish.Component.File do
{:ok, framerate} <- validate_framerate(valid_opts.framerate),
{:ok, track_config} <-
get_track_config(path, framerate) do
endpoint_spec =
%FileEndpoint{
rtc_engine: engine,
file_path: path,
track_config: track_config,
payload_type: track_config.fmtp.pt
}
endpoint_spec = %FileEndpoint{
rtc_engine: engine,
file_path: path,
track_config: track_config,
payload_type: track_config.fmtp.pt
}

properties = valid_opts |> Map.from_struct()

Expand Down
60 changes: 51 additions & 9 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Jellyfish.Room do
:engine_pid,
:network_options
]
defstruct @enforce_keys ++ [components: %{}, peers: %{}]
defstruct @enforce_keys ++ [components: %{}, peers: %{}, last_peer_left: 0]

@type id :: String.t()

Expand All @@ -46,14 +46,17 @@ defmodule Jellyfish.Room do
* `components` - map of components
* `peers` - map of peers
* `engine` - pid of engine
* `network_options` - network options
* `last_peer_left` - arbitrary timestamp with latest occurence of the room becoming peerless
"""
@type t :: %__MODULE__{
id: id(),
config: Config.t(),
components: %{Component.id() => Component.t()},
peers: %{Peer.id() => Peer.t()},
engine_pid: pid(),
network_options: map()
network_options: map(),
last_peer_left: integer()
}

defguardp endpoint_exists?(state, endpoint_id)
Expand Down Expand Up @@ -227,7 +230,9 @@ defmodule Jellyfish.Room do
def handle_call({:remove_peer, peer_id}, _from, state) do
{reply, state} =
if Map.has_key?(state.peers, peer_id) do
state = handle_remove_peer(peer_id, state, :peer_removed)
state =
handle_remove_peer(peer_id, state, :peer_removed)
|> maybe_schedule_peerless_purge()

{:ok, state}
else
Expand Down Expand Up @@ -434,6 +439,7 @@ defmodule Jellyfish.Room do
def handle_info(%EndpointRemoved{endpoint_id: endpoint_id}, state) do
{_endpoint, state} = pop_in(state, [:peers, endpoint_id])
Logger.info("Peer #{endpoint_id} removed")
state = maybe_schedule_peerless_purge(state)
{:noreply, state}
end

Expand Down Expand Up @@ -546,6 +552,19 @@ defmodule Jellyfish.Room do
{:noreply, state}
end

@impl true
def handle_info(:peerless_purge, state) do
if peerless_long_enough?(state) do
Logger.info(
"Removing room because it was peerless for #{state.config.peerless_purge_timeout} seconds"
)

{:stop, :normal, state}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopping with normal won't cause other processes that are linked to the room process to exit. I am not sure if that's correct. I think I would go for {:shutdown, :peerless_purge}? See https://hexdocs.pm/elixir/1.16.0/Supervisor.html#module-exit-reasons-and-restarts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's exactly how RoomService removes rooms -- here

We're manually removing all of the linked processes in the terminate callback, no?

else
{:noreply, state}
end
end

@impl true
def handle_info(info, state) do
Logger.warning("Received unexpected info: #{inspect(info)}")
Expand Down Expand Up @@ -600,14 +619,37 @@ defmodule Jellyfish.Room do
TURNManager.ensure_tcp_turn_launched(turn_options, port: tcp_turn_port)
end

%__MODULE__{
id: id,
config: config,
engine_pid: pid,
network_options: [turn_options: turn_options]
}
state =
%__MODULE__{
id: id,
config: config,
engine_pid: pid,
network_options: [turn_options: turn_options]
}
|> maybe_schedule_peerless_purge()

state
end

defp maybe_schedule_peerless_purge(%{config: %{peerless_purge_timeout: nil}} = state), do: state

defp maybe_schedule_peerless_purge(%{config: config, peers: peers} = state)
when map_size(peers) == 0 do
last_peer_left = Klotho.monotonic_time(:millisecond)
Klotho.send_after(config.peerless_purge_timeout * 1000, self(), :peerless_purge)

%{state | last_peer_left: last_peer_left}
end

defp maybe_schedule_peerless_purge(state), do: state

defp peerless_long_enough?(%{config: config, peers: peers, last_peer_left: last_peer_left})
when map_size(peers) == 0 do
Klotho.monotonic_time(:millisecond) >= last_peer_left + config.peerless_purge_timeout * 1000
end

defp peerless_long_enough?(_state), do: false

defp handle_remove_component(component_id, state, reason) do
{component, state} = pop_in(state, [:components, component_id])
:ok = Engine.remove_endpoint(state.engine_pid, component_id)
Expand Down
19 changes: 13 additions & 6 deletions lib/jellyfish/room/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ defmodule Jellyfish.Room.Config do
@moduledoc """
Room configuration
"""
@enforce_keys [:room_id, :max_peers, :video_codec, :webhook_url]
@enforce_keys [:room_id, :max_peers, :video_codec, :webhook_url, :peerless_purge_timeout]

defstruct @enforce_keys

@type room_id :: String.t() | nil
@type max_peers :: non_neg_integer() | nil
@type video_codec :: :h264 | :vp8 | nil
@type webhook_url :: String.t()
@type peerless_purge_timeout :: pos_integer() | nil

@type t :: %__MODULE__{
room_id: room_id(),
max_peers: max_peers(),
video_codec: video_codec(),
webhook_url: URI.t()
webhook_url: URI.t(),
peerless_purge_timeout: peerless_purge_timeout()
}

@spec from_params(map()) :: {:ok, __MODULE__.t()} | {:error, atom()}
Expand All @@ -24,20 +26,21 @@ defmodule Jellyfish.Room.Config do
max_peers = Map.get(params, "maxPeers")
video_codec = Map.get(params, "videoCodec")
webhook_url = Map.get(params, "webhookUrl")
peerless_purge_timeout = Map.get(params, "peerlessPurgeTimeout")

with {:ok, room_id} <- parse_room_id(room_id),
:ok <- validate_max_peers(max_peers),
{:ok, video_codec} <- codec_to_atom(video_codec),
:ok <- validate_webhook_url(webhook_url) do
:ok <- validate_webhook_url(webhook_url),
:ok <- validate_purge_timeout(peerless_purge_timeout) do
{:ok,
%__MODULE__{
room_id: room_id,
max_peers: max_peers,
video_codec: video_codec,
webhook_url: webhook_url
webhook_url: webhook_url,
peerless_purge_timeout: peerless_purge_timeout
}}
else
error -> error
end
end

Expand Down Expand Up @@ -67,4 +70,8 @@ defmodule Jellyfish.Room.Config do
defp codec_to_atom("vp8"), do: {:ok, :vp8}
defp codec_to_atom(nil), do: {:ok, nil}
defp codec_to_atom(_codec), do: {:error, :invalid_video_codec}

defp validate_purge_timeout(nil), do: :ok
defp validate_purge_timeout(timeout) when is_integer(timeout) and timeout > 0, do: :ok
defp validate_purge_timeout(_timeout), do: {:error, :invalid_peerless_purge_timeout}
end
8 changes: 8 additions & 0 deletions lib/jellyfish_web/api_spec/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ defmodule JellyfishWeb.ApiSpec.Room do
type: :string,
example: "https://backend.address.com/jellyfish-notifications-endpoint",
nullable: true
},
peerlessPurgeTimeout: %Schema{
description:
"Duration (in seconds) after which the room will be removed if no peers are connected. If not provided, this feature is disabled.",
type: :integer,
minimum: 1,
example: 60,
nullable: true
}
}
})
Expand Down
6 changes: 6 additions & 0 deletions lib/jellyfish_web/controllers/room_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ defmodule JellyfishWeb.RoomController do
webhook_url = Map.get(params, "webhookUrl")
{:error, :bad_request, "Expected webhookUrl to be valid URL, got: #{webhook_url}"}

{:error, :invalid_peerless_purge_timeout} ->
timeout = Map.get(params, "peerlessPurgeTimeout")

{:error, :bad_request,
"Expected peerlessPurgeTimeout to be a positive integer, got: #{timeout}"}

{:error, :room_already_exists} ->
room_id = Map.get(params, "roomId")
{:error, :bad_request, "Cannot add room with id \"#{room_id}\" - room already exists"}
Expand Down
3 changes: 3 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ defmodule Jellyfish.MixProject do
{:libcluster, "~> 3.3"},
{:httpoison, "~> 2.0"},

# Mocking timer in tests
{:klotho, "~> 0.1.0"},

# Test deps
{:websockex, "~> 0.4.3", only: [:test, :ci], runtime: false},
{:excoveralls, "~> 0.15.0", only: :test, runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"klotho": {:hex, :klotho, "0.1.2", "3b1f1a569703e0cdce1ba964f41711351a7b06846c38fcbd601faa407e712bf2", [:mix], [], "hexpm", "a6a387982753582e30a5246fe9561721c6b9a4dd27678296cf2cd44faa6f3733"},
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
"membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.5", "74a0fd9b121a9f18e038573931fa2952b95a977a4e982a844734129e977e0fb9", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "560ea01c1fc707770bcdfb30d47be5f77be3e4d86a872bc1e34261a134bf6f98"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
Expand Down
6 changes: 6 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,12 @@ components:
minimum: 1
nullable: true
type: integer
peerlessPurgeTimeout:
description: Duration (in seconds) after which the room will be removed if no peers are connected. If not provided, this feature is disabled.
example: 60
minimum: 1
nullable: true
type: integer
roomId:
description: Custom id used for identifying room within Jellyfish. Must be unique across all rooms. If not provided, random UUID is generated.
nullable: true
Expand Down
74 changes: 74 additions & 0 deletions test/jellyfish/room_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
defmodule Jellyfish.RoomTest do
use ExUnit.Case, async: true

alias Jellyfish.{Peer, Room}

@purge_timeout_s 60
@purge_timeout_ms @purge_timeout_s * 1000
@message_timeout_ms 20

setup do
Klotho.Mock.reset()
Klotho.Mock.freeze()
end

describe "peerless purge" do
test "happens if peers never joined" do
{:ok, config} = Room.Config.from_params(%{"peerlessPurgeTimeout" => @purge_timeout_s})
{:ok, pid, _id} = Room.start(config)
Process.monitor(pid)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)

assert_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms
end

test "happens if peers joined, then left" do
{:ok, config} = Room.Config.from_params(%{"peerlessPurgeTimeout" => @purge_timeout_s})
{:ok, pid, id} = Room.start(config)
Process.monitor(pid)

{:ok, peer} = Room.add_peer(id, Peer.WebRTC)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = Room.remove_peer(id, peer.id)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
assert_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms
end

test "does not happen if peers rejoined quickly" do
{:ok, config} = Room.Config.from_params(%{"peerlessPurgeTimeout" => @purge_timeout_s})
{:ok, pid, id} = Room.start(config)
Process.monitor(pid)

{:ok, peer} = Room.add_peer(id, Peer.WebRTC)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = Room.remove_peer(id, peer.id)

Klotho.Mock.warp_by(@purge_timeout_ms |> div(2))
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

{:ok, _peer} = Room.add_peer(id, Peer.WebRTC)
Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = GenServer.stop(pid)
end

test "does not happen when not configured" do
{:ok, config} = Room.Config.from_params(%{})
{:ok, pid, _id} = Room.start(config)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = GenServer.stop(pid)
end
end
end
12 changes: 4 additions & 8 deletions test/jellyfish_web/controllers/component/file_component_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"filePath" => @video_source
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")

Expand Down Expand Up @@ -92,8 +91,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"framerate" => 60
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")
end
Expand All @@ -115,8 +113,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"filePath" => @audio_source
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")

Expand Down Expand Up @@ -164,8 +161,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"filePath" => ^video_relative_path
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ defmodule JellyfishWeb.Component.HlsComponentTest do
"type" => "hls",
"properties" => @hls_properties
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "hls")

Expand Down Expand Up @@ -93,8 +92,7 @@ defmodule JellyfishWeb.Component.HlsComponentTest do
"type" => "hls",
"properties" => @hls_properties
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

parent = self()
ref = make_ref()
Expand Down
Loading