diff --git a/.gitignore b/.gitignore index 3931b28f..031ab898 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.lexical/* compile_commands.json .gdb_history bundlex.sh diff --git a/config/config.exs b/config/config.exs index 4610a2da..086f6f3d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -17,7 +17,7 @@ config :membrane_telemetry_metrics, enabled: true config :logger, :console, format: "$time $metadata[$level] $message\n", - metadata: [:request_id, :room_id] + metadata: [:request_id, :room_id, :peer_id] config :logger_json, :backend, metadata: [:request_id, :room_id], diff --git a/lib/jellyfish/peer.ex b/lib/jellyfish/peer.ex index 621fee58..78dcad36 100644 --- a/lib/jellyfish/peer.ex +++ b/lib/jellyfish/peer.ex @@ -13,7 +13,14 @@ defmodule Jellyfish.Peer do :type, :engine_endpoint ] - defstruct @enforce_keys ++ [status: :disconnected, socket_pid: nil, tracks: %{}, metadata: nil] + defstruct @enforce_keys ++ + [ + status: :disconnected, + socket_pid: nil, + tracks: %{}, + metadata: nil, + last_time_connected: nil + ] @type id :: String.t() @type peer :: WebRTC @@ -32,7 +39,8 @@ defmodule Jellyfish.Peer do socket_pid: pid() | nil, engine_endpoint: Membrane.ChildrenSpec.child_definition(), tracks: %{Track.id() => Track.t()}, - metadata: any() + metadata: any(), + last_time_connected: integer() | nil } @spec parse_type(String.t()) :: {:ok, peer()} | {:error, :invalid_type} diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 44a8f3b2..fa25f80b 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -6,16 +6,15 @@ defmodule Jellyfish.Room do use Bunch.Access use GenServer + import Jellyfish.Room.State + require Logger alias Jellyfish.Component - alias Jellyfish.Component.{HLS, Recording, RTSP, SIP} - alias Jellyfish.Event + alias Jellyfish.Component.{HLS, Recording, SIP} alias Jellyfish.Peer - alias Jellyfish.Room.Config - alias Jellyfish.Track + alias Jellyfish.Room.{Config, State} - alias Membrane.ICE.TURNManager alias Membrane.RTC.Engine alias Membrane.RTC.Engine.Endpoint @@ -30,38 +29,8 @@ defmodule Jellyfish.Room do TrackRemoved } - @enforce_keys [ - :id, - :config, - :engine_pid, - :network_options - ] - defstruct @enforce_keys ++ [components: %{}, peers: %{}, last_peer_left: 0] - @type id :: String.t() - - @typedoc """ - This module contains: - * `id` - room id - * `config` - configuration of room. For example you can specify maximal number of peers - * `components` - map of components - * `peers` - map of peers - * `engine` - pid of engine - * `network_options` - network options - * `last_peer_left` - arbitrary timestamp with latest occurence of the room becoming peerless - """ - @type t :: %__MODULE__{ - id: id(), - config: Config.t(), - components: %{Component.id() => Component.t()}, - peers: %{Peer.id() => Peer.t()}, - engine_pid: pid(), - network_options: map(), - last_peer_left: integer() - } - - defguardp endpoint_exists?(state, endpoint_id) - when is_map_key(state.components, endpoint_id) or is_map_key(state.peers, endpoint_id) + @type t :: State.t() def registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} @@ -75,7 +44,7 @@ defmodule Jellyfish.Room do end end - @spec get_state(id()) :: t() | nil + @spec get_state(id()) :: State.t() | nil def get_state(room_id) do registry_room_id = registry_id(room_id) @@ -158,7 +127,8 @@ defmodule Jellyfish.Room do @impl true def init([id, config]) do - state = new(id, config) + state = State.new(id, config) + Logger.metadata(room_id: id) Logger.info("Initialize room") @@ -171,52 +141,33 @@ defmodule Jellyfish.Room do end @impl true - def handle_call({:add_peer, peer_type, options}, _from, state) do - {reply, state} = - if Enum.count(state.peers) == state.config.max_peers do - {{:error, :reached_peers_limit}, state} - else - options = - Map.merge( - %{ - engine_pid: state.engine_pid, - network_options: state.network_options, - video_codec: state.config.video_codec, - room_id: state.id - }, - options - ) - - with {:ok, peer} <- Peer.new(peer_type, options) do - state = put_in(state, [:peers, peer.id], peer) |> maybe_schedule_peerless_purge() - - Logger.info("Added peer #{inspect(peer.id)}") - - {{:ok, peer}, state} - else - {:error, reason} -> - Logger.warning("Unable to add peer: #{inspect(reason)}") - {:error, state} - end - end + def handle_call({:add_peer, peer_type, override_options}, _from, state) do + with false <- State.reached_peers_limit?(state), + options <- State.generate_peer_options(state, override_options), + {:ok, peer} <- Peer.new(peer_type, options) do + state = State.put_peer(state, peer) - {:reply, reply, state} + Logger.info("Added peer #{inspect(peer.id)}") + + {:reply, {:ok, peer}, state} + else + true -> + {:reply, {:error, :reached_peers_limit}, state} + + {:error, reason} -> + Logger.warning("Unable to add peer: #{inspect(reason)}") + {:reply, :error, state} + end end @impl true def handle_call({:set_peer_connected, peer_id}, {socket_pid, _tag}, state) do {reply, state} = - case Map.fetch(state.peers, peer_id) do + case State.fetch_peer(state, peer_id) do {:ok, %{status: :disconnected} = peer} -> Process.monitor(socket_pid) - peer = %{peer | status: :connected, socket_pid: socket_pid} - state = put_in(state, [:peers, peer_id], peer) - - :ok = Engine.add_endpoint(state.engine_pid, peer.engine_endpoint, id: peer_id) - - Logger.info("Peer #{inspect(peer_id)} connected") - :telemetry.execute([:jellyfish, :room], %{peer_connects: 1}, %{room_id: state.id}) + state = State.connect_peer(state, peer, socket_pid) {:ok, state} @@ -233,7 +184,7 @@ defmodule Jellyfish.Room do @impl true def handle_call({:get_peer_connection_status, peer_id}, _from, state) do reply = - case Map.fetch(state.peers, peer_id) do + case State.fetch_peer(state, peer_id) do {:ok, peer} -> {:ok, peer.status} :error -> {:error, :peer_not_found} end @@ -244,10 +195,8 @@ defmodule Jellyfish.Room do @impl true def handle_call({:remove_peer, peer_id}, _from, state) do {reply, state} = - if Map.has_key?(state.peers, peer_id) do - state = - handle_remove_peer(peer_id, state, :peer_removed) - |> maybe_schedule_peerless_purge() + if peer_exists?(state, peer_id) do + state = State.remove_peer(state, peer_id, :peer_removed) {:ok, state} else @@ -259,19 +208,21 @@ defmodule Jellyfish.Room do @impl true def handle_call({:add_component, component_type, options}, _from, state) do + engine_pid = State.engine_pid(state) + options = Map.merge( - %{engine_pid: state.engine_pid, room_id: state.id}, + %{engine_pid: engine_pid, room_id: State.id(state)}, options ) with :ok <- check_component_allowed(component_type, state), {:ok, component} <- Component.new(component_type, options) do - state = put_in(state, [:components, component.id], component) + state = State.put_component(state, component) component_type.after_init(state, component, options) - :ok = Engine.add_endpoint(state.engine_pid, component.engine_endpoint, id: component.id) + :ok = Engine.add_endpoint(engine_pid, component.engine_endpoint, id: component.id) Logger.info("Added component #{inspect(component.id)}") @@ -335,8 +286,8 @@ defmodule Jellyfish.Room do @impl true def handle_call({:remove_component, component_id}, _from, state) do {reply, state} = - if Map.has_key?(state.components, component_id) do - state = handle_remove_component(component_id, state, :component_removed) + if component_exists?(state, component_id) do + state = State.remove_component(state, component_id, :component_removed) {:ok, state} else {{:error, :component_not_found}, state} @@ -347,15 +298,17 @@ defmodule Jellyfish.Room do @impl true def handle_call({:subscribe, component_id, origins}, _from, state) do - component = get_component_by_id(state, component_id) + component = State.get_component_by_id(state, component_id) + + engine_pid = State.engine_pid(state) reply = case validate_subscription_mode(component) do :ok when component.type == HLS -> - Endpoint.HLS.subscribe(state.engine_pid, component.id, origins) + Endpoint.HLS.subscribe(engine_pid, component.id, origins) :ok when component.type == Recording -> - Endpoint.Recording.subscribe(state.engine_pid, component.id, origins) + Endpoint.Recording.subscribe(engine_pid, component.id, origins) :ok when component.type not in [HLS, Recording] -> {:error, :invalid_component_type} @@ -369,15 +322,22 @@ defmodule Jellyfish.Room do @impl true def handle_call(:get_num_forwarded_tracks, _from, state) do - forwarded_tracks = Engine.get_num_forwarded_tracks(state.engine_pid) + forwarded_tracks = + state + |> State.engine_pid() + |> Engine.get_num_forwarded_tracks() + {:reply, forwarded_tracks, state} end @impl true def handle_call({:dial, component_id, phone_number}, _from, state) do - case Map.fetch(state.components, component_id) do + case State.fetch_component(state, component_id) do {:ok, component} when component.type == SIP -> - Endpoint.SIP.dial(state.engine_pid, component_id, phone_number) + state + |> State.engine_pid() + |> Endpoint.SIP.dial(component_id, phone_number) + {:reply, :ok, state} {:ok, _component} -> @@ -390,9 +350,12 @@ defmodule Jellyfish.Room do @impl true def handle_call({:end_call, component_id}, _from, state) do - case Map.fetch(state.components, component_id) do + case State.fetch_component(state, component_id) do {:ok, component} when component.type == SIP -> - Endpoint.SIP.end_call(state.engine_pid, component_id) + state + |> State.engine_pid() + |> Endpoint.SIP.end_call(component_id) + {:reply, :ok, state} :error -> @@ -405,13 +368,16 @@ defmodule Jellyfish.Room do @impl true def handle_cast({:media_event, peer_id, event}, state) do - Engine.message_endpoint(state.engine_pid, peer_id, {:media_event, event}) + state + |> State.engine_pid() + |> Engine.message_endpoint(peer_id, {:media_event, event}) + {:noreply, state} end @impl true def handle_info(%EndpointMessage{endpoint_id: to, message: {:media_event, data}}, state) do - with {:ok, peer} <- Map.fetch(state.peers, to), + with {:ok, peer} <- State.fetch_peer(state, to), socket_pid when is_pid(socket_pid) <- Map.get(peer, :socket_pid) do send(socket_pid, {:media_event, data}) else @@ -434,10 +400,10 @@ defmodule Jellyfish.Room do Logger.error("RTC Engine endpoint #{inspect(endpoint_id)} crashed: #{inspect(reason)}") state = - if Map.has_key?(state.peers, endpoint_id) do - handle_remove_peer(endpoint_id, state, {:peer_crashed, parse_crash_reason(reason)}) + if peer_exists?(state, endpoint_id) do + State.remove_peer(state, endpoint_id, {:peer_crashed, parse_crash_reason(reason)}) else - handle_remove_component(endpoint_id, state, :component_crashed) + State.remove_component(state, endpoint_id, :component_crashed) end {:noreply, state} @@ -445,29 +411,7 @@ defmodule Jellyfish.Room do @impl true def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do - state = - case Enum.find(state.peers, fn {_id, peer} -> peer.socket_pid == pid end) do - nil -> - state - - {peer_id, peer} -> - :ok = Engine.remove_endpoint(state.engine_pid, peer_id) - Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) - :telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id}) - - peer.tracks - |> Map.values() - |> Enum.each( - &Event.broadcast_server_notification( - {:track_removed, state.id, {:peer_id, peer_id}, &1} - ) - ) - - peer = %{peer | status: :disconnected, socket_pid: nil, tracks: %{}} - - put_in(state, [:peers, peer_id], peer) - |> maybe_schedule_peerless_purge() - end + state = State.disconnect_peer(state, pid) {:noreply, state} end @@ -477,15 +421,7 @@ defmodule Jellyfish.Room do @impl true def handle_info({:playlist_playable, :video, _playlist_id}, state) do - endpoint_id = - Enum.find_value(state.components, fn {id, %{type: type}} -> - if type == HLS, do: id - end) - - Event.broadcast_server_notification({:hls_playable, state.id, endpoint_id}) - - state = - update_in(state, [:components, endpoint_id, :properties], &Map.put(&1, :playable, true)) + state = State.set_hls_playable(state) {:noreply, state} end @@ -507,15 +443,15 @@ defmodule Jellyfish.Room do @impl true def handle_info(%EndpointRemoved{endpoint_id: endpoint_id}, state) - when is_map_key(state.peers, endpoint_id) do + when peer_exists?(state, endpoint_id) do # The peer has been either removed, crashed or disconnected # The changes in state are applied in appropriate callbacks {:noreply, state} end def handle_info(%EndpointRemoved{endpoint_id: endpoint_id}, state) - when is_map_key(state.components, endpoint_id) do - state = handle_remove_component(endpoint_id, state, :component_finished) + when component_exists?(state, endpoint_id) do + state = State.remove_component(state, endpoint_id, :component_finished) {:noreply, state} end @@ -533,11 +469,10 @@ defmodule Jellyfish.Room do %EndpointMetadataUpdated{endpoint_id: endpoint_id, endpoint_metadata: metadata}, state ) - when is_map_key(state.peers, endpoint_id) do + when peer_exists?(state, endpoint_id) do Logger.info("Peer #{endpoint_id} metadata updated: #{inspect(metadata)}") - Event.broadcast_server_notification({:peer_metadata_updated, state.id, endpoint_id, metadata}) - state = put_in(state, [:peers, endpoint_id, :metadata], metadata) + state = State.update_peer_metadata(state, endpoint_id, metadata) {:noreply, state} end @@ -549,19 +484,7 @@ defmodule Jellyfish.Room do @impl true def handle_info(%TrackAdded{endpoint_id: endpoint_id} = track_info, state) when endpoint_exists?(state, endpoint_id) do - endpoint_id_type = get_endpoint_id_type(state, endpoint_id) - - Logger.info("Track #{track_info.track_id} added, #{endpoint_id_type}: #{endpoint_id}") - - Event.broadcast_server_notification( - {:track_added, state.id, {endpoint_id_type, endpoint_id}, track_info} - ) - - endpoint_group = get_endpoint_group(state, track_info.endpoint_id) - access_path = [endpoint_group, track_info.endpoint_id, :tracks, track_info.track_id] - - track = Track.from_track_message(track_info) - state = put_in(state, access_path, track) + state = State.put_track(state, track_info) {:noreply, state} end @@ -575,25 +498,7 @@ defmodule Jellyfish.Room do @impl true def handle_info(%TrackMetadataUpdated{endpoint_id: endpoint_id} = track_info, state) when endpoint_exists?(state, endpoint_id) do - endpoint_group = get_endpoint_group(state, endpoint_id) - access_path = [endpoint_group, endpoint_id, :tracks, track_info.track_id] - - state = - update_in(state, access_path, fn - %Track{} = track -> - endpoint_id_type = get_endpoint_id_type(state, endpoint_id) - updated_track = %Track{track | metadata: track_info.track_metadata} - - Logger.info( - "Track #{updated_track.id}, #{endpoint_id_type}: #{endpoint_id} - metadata updated: #{inspect(updated_track.metadata)}" - ) - - Event.broadcast_server_notification( - {:track_metadata_updated, state.id, {endpoint_id_type, endpoint_id}, updated_track} - ) - - updated_track - end) + state = State.update_track(state, track_info) {:noreply, state} end @@ -607,17 +512,7 @@ defmodule Jellyfish.Room do @impl true def handle_info(%TrackRemoved{endpoint_id: endpoint_id} = track_info, state) when endpoint_exists?(state, endpoint_id) do - endpoint_group = get_endpoint_group(state, endpoint_id) - access_path = [endpoint_group, endpoint_id, :tracks, track_info.track_id] - - {track, state} = pop_in(state, access_path) - - endpoint_id_type = get_endpoint_id_type(state, endpoint_id) - Logger.info("Track removed: #{track.id}, #{endpoint_id_type}: #{endpoint_id}") - - Event.broadcast_server_notification( - {:track_removed, state.id, {endpoint_id_type, endpoint_id}, track} - ) + state = State.remove_track(state, track_info) {:noreply, state} end @@ -630,211 +525,51 @@ defmodule Jellyfish.Room do @impl true def handle_info(:peerless_purge, state) do - if peerless_long_enough?(state) do + if State.peerless_long_enough?(state) do Logger.info( - "Removing room because it was peerless for #{state.config.peerless_purge_timeout} seconds" + "Removing room because it was peerless for #{State.peerless_purge_timeout(state)} seconds" ) {:stop, :normal, state} else + Logger.debug("Ignore peerless purge message") + {:noreply, state} end end @impl true - def handle_info(info, state) do - Logger.warning("Received unexpected info: #{inspect(info)}") - {:noreply, state} - end - - @impl true - def terminate(_reason, %{engine_pid: engine_pid} = state) do - Engine.terminate(engine_pid, asynchronous?: true, timeout: 10_000) - - state.peers - |> Map.values() - |> Enum.each(&handle_remove_peer(&1.id, state, :room_stopped)) - - state.components - |> Map.values() - |> Enum.each(&handle_remove_component(&1.id, state, :room_stopped)) - - :ok - end - - defp new(id, config) do - rtc_engine_options = [ - id: id - ] - - {:ok, pid} = Engine.start_link(rtc_engine_options, []) - Engine.register(pid, self()) - - webrtc_config = Application.fetch_env!(:jellyfish, :webrtc_config) - - turn_options = - if webrtc_config[:webrtc_used?] do - turn_ip = webrtc_config[:turn_listen_ip] - turn_mock_ip = webrtc_config[:turn_ip] - - [ - ip: turn_ip, - mock_ip: turn_mock_ip, - ports_range: webrtc_config[:turn_port_range] - ] - else - [] - end - - tcp_turn_port = webrtc_config[:turn_tcp_port] - - if webrtc_config[:webrtc_used?] and tcp_turn_port != nil do - TURNManager.ensure_tcp_turn_launched(turn_options, port: tcp_turn_port) - end - - state = - %__MODULE__{ - id: id, - config: config, - engine_pid: pid, - network_options: [turn_options: turn_options] - } - |> maybe_schedule_peerless_purge() - - state - end - - defp maybe_schedule_peerless_purge(%{config: %{peerless_purge_timeout: nil}} = state), do: state - - defp maybe_schedule_peerless_purge(%{config: config, peers: peers} = state) do - if all_peers_disconnected?(peers) do - last_peer_left = Klotho.monotonic_time(:millisecond) - - Klotho.send_after(config.peerless_purge_timeout * 1000, self(), :peerless_purge) - - %{state | last_peer_left: last_peer_left} - else - state - end - end - - defp peerless_long_enough?(%{config: config, peers: peers, last_peer_left: last_peer_left}) do - if all_peers_disconnected?(peers) do - Klotho.monotonic_time(:millisecond) >= last_peer_left + config.peerless_purge_timeout * 1000 - else - false - end - end - - defp all_peers_disconnected?(peers) do - peers |> Map.values() |> Enum.all?(&(&1.status == :disconnected)) - end - - defp handle_remove_component(component_id, state, reason) do - {component, state} = pop_in(state, [:components, component_id]) - :ok = Engine.remove_endpoint(state.engine_pid, component_id) - - component.tracks - |> Map.values() - |> Enum.each( - &Event.broadcast_server_notification( - {:track_removed, state.id, {:component_id, component_id}, &1} + def handle_info({:peer_purge, peer_id}, state) do + with {:ok, peer} <- State.fetch_peer(state, peer_id), + true <- State.peer_disconnected_long_enough?(state, peer) do + Logger.info( + "Removing peer because it was disconnected for #{State.peerless_purge_timeout(state)} seconds" ) - ) - Logger.info("Removed component #{inspect(component_id)}: #{inspect(reason)}") + state = State.remove_peer(state, peer_id, :timeout) - component.type.on_remove(state, component) - - if reason == :component_crashed, - do: Event.broadcast_server_notification({:component_crashed, state.id, component_id}) - - state - end - - defp handle_remove_peer(peer_id, state, reason) do - {peer, state} = pop_in(state, [:peers, peer_id]) - :ok = Engine.remove_endpoint(state.engine_pid, peer_id) - - if is_pid(peer.socket_pid), - do: send(peer.socket_pid, {:stop_connection, reason}) - - peer.tracks - |> Map.values() - |> Enum.each( - &Event.broadcast_server_notification({:track_removed, state.id, {:peer_id, peer_id}, &1}) - ) - - Logger.info("Removed peer #{inspect(peer_id)} from room #{inspect(state.id)}") - - if peer.status == :connected and reason == :peer_removed do - Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) - :telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id}) - end + {:noreply, state} + else + _other -> + Logger.debug("Ignore peer purge message for peer: #{peer_id}") - with {:peer_crashed, crash_reason} <- reason do - Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason}) - :telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id}) + {:noreply, state} end - - state end - defp get_component_by_id(%{components: components}, component_id), - do: - Enum.find_value(components, fn {id, component} -> - if id == component_id, do: component - end) - - defp check_component_allowed(type, %{ - config: %{video_codec: video_codec}, - components: components - }) - when type in [HLS, Recording] do - cond do - video_codec != :h264 -> - {:error, :incompatible_codec} - - component_already_present?(type, components) -> - {:error, :reached_components_limit} - - true -> - :ok - end - end - - defp check_component_allowed(RTSP, %{config: %{video_codec: video_codec}}) do - # Right now, RTSP component can only publish H264, so there's no point adding it - # to a room which allows another video codec, e.g. VP8 - if video_codec == :h264, - do: :ok, - else: {:error, :incompatible_codec} + @impl true + def handle_info(info, state) do + Logger.warning("Received unexpected info: #{inspect(info)}") + {:noreply, state} end - defp check_component_allowed(_component_type, _state), do: :ok - - defp component_already_present?(type, components), - do: components |> Map.values() |> Enum.any?(&(&1.type == type)) - - defp validate_subscription_mode(nil), do: {:error, :component_not_exists} - - defp validate_subscription_mode(%{properties: %{subscribe_mode: :auto}}), - do: {:error, :invalid_subscribe_mode} - - defp validate_subscription_mode(%{properties: %{subscribe_mode: :manual}}), do: :ok - defp validate_subscription_mode(_not_properties), do: {:error, :invalid_component_type} - - defp get_endpoint_group(state, endpoint_id) when is_map_key(state.components, endpoint_id), - do: :components + @impl true + def terminate(_reason, %{engine_pid: engine_pid} = state) do + Engine.terminate(engine_pid, asynchronous?: true, timeout: 10_000) - defp get_endpoint_group(state, endpoint_id) when is_map_key(state.peers, endpoint_id), - do: :peers + State.remove_all_endpoints(state) - defp get_endpoint_id_type(state, endpoint_id) do - case get_endpoint_group(state, endpoint_id) do - :peers -> :peer_id - :components -> :component_id - end + :ok end defp parse_crash_reason( diff --git a/lib/jellyfish/room/config.ex b/lib/jellyfish/room/config.ex index d833163c..c8cb5053 100644 --- a/lib/jellyfish/room/config.ex +++ b/lib/jellyfish/room/config.ex @@ -2,7 +2,14 @@ defmodule Jellyfish.Room.Config do @moduledoc """ Room configuration """ - @enforce_keys [:room_id, :max_peers, :video_codec, :webhook_url, :peerless_purge_timeout] + @enforce_keys [ + :room_id, + :max_peers, + :video_codec, + :webhook_url, + :peerless_purge_timeout, + :peer_disconnected_timeout + ] defstruct @enforce_keys @@ -10,14 +17,15 @@ defmodule Jellyfish.Room.Config do @type max_peers :: non_neg_integer() | nil @type video_codec :: :h264 | :vp8 | nil @type webhook_url :: String.t() - @type peerless_purge_timeout :: pos_integer() | nil + @type purge_timeout :: pos_integer() | nil @type t :: %__MODULE__{ room_id: room_id(), max_peers: max_peers(), video_codec: video_codec(), webhook_url: URI.t(), - peerless_purge_timeout: peerless_purge_timeout() + peerless_purge_timeout: purge_timeout(), + peer_disconnected_timeout: purge_timeout() } @spec from_params(map()) :: {:ok, __MODULE__.t()} | {:error, atom()} @@ -27,19 +35,22 @@ defmodule Jellyfish.Room.Config do video_codec = Map.get(params, "videoCodec") webhook_url = Map.get(params, "webhookUrl") peerless_purge_timeout = Map.get(params, "peerlessPurgeTimeout") + peer_disconnected_timeout = Map.get(params, "peerDisconnectedTimeout") with {:ok, room_id} <- parse_room_id(room_id), :ok <- validate_max_peers(max_peers), {:ok, video_codec} <- codec_to_atom(video_codec), :ok <- validate_webhook_url(webhook_url), - :ok <- validate_purge_timeout(peerless_purge_timeout) do + :ok <- validate_purge_timeout(peerless_purge_timeout, "peerlessPurgeTimeout"), + :ok <- validate_purge_timeout(peer_disconnected_timeout, "peerDisconnectedTimeout") do {:ok, %__MODULE__{ room_id: room_id, max_peers: max_peers, video_codec: video_codec, webhook_url: webhook_url, - peerless_purge_timeout: peerless_purge_timeout + peerless_purge_timeout: peerless_purge_timeout, + peer_disconnected_timeout: peer_disconnected_timeout }} end end @@ -79,7 +90,7 @@ defmodule Jellyfish.Room.Config do defp codec_to_atom(nil), do: {:ok, nil} defp codec_to_atom(_codec), do: {:error, :invalid_video_codec} - defp validate_purge_timeout(nil), do: :ok - defp validate_purge_timeout(timeout) when is_integer(timeout) and timeout > 0, do: :ok - defp validate_purge_timeout(_timeout), do: {:error, :invalid_peerless_purge_timeout} + defp validate_purge_timeout(nil, _param), do: :ok + defp validate_purge_timeout(timeout, _param) when is_integer(timeout) and timeout > 0, do: :ok + defp validate_purge_timeout(_timeout, param), do: {:error, :invalid_purge_timeout, param} end diff --git a/lib/jellyfish/room/state.ex b/lib/jellyfish/room/state.ex new file mode 100644 index 00000000..1a6c96a4 --- /dev/null +++ b/lib/jellyfish/room/state.ex @@ -0,0 +1,459 @@ +defmodule Jellyfish.Room.State do + @moduledoc false + + use Bunch.Access + + require Logger + + alias Jellyfish.{Component, Event, Peer, Room, Track} + alias Jellyfish.Component.{HLS, Recording, RTSP} + alias Jellyfish.Room.Config + alias Membrane.ICE.TURNManager + alias Membrane.RTC.Engine + + alias Membrane.RTC.Engine.Message.{ + TrackAdded, + TrackMetadataUpdated, + TrackRemoved + } + + @enforce_keys [ + :id, + :config, + :engine_pid, + :network_options + ] + defstruct @enforce_keys ++ [components: %{}, peers: %{}, last_peer_left: 0] + + @type reason_t :: any() + @type endpoint_id :: Component.id() | Peer.id() + + @typedoc """ + This module contains: + * `id` - room id + * `config` - configuration of room. For example you can specify maximal number of peers + * `components` - map of components + * `peers` - map of peers + * `engine` - pid of engine + * `network_options` - network options + * `last_peer_left` - arbitrary timestamp with latest occurence of the room becoming peerless + """ + @type t :: %__MODULE__{ + id: Room.id(), + config: Config.t(), + components: %{Component.id() => Component.t()}, + peers: %{Peer.id() => Peer.t()}, + engine_pid: pid(), + network_options: map(), + last_peer_left: integer() + } + + defguard peer_exists?(state, endpoint_id) when is_map_key(state.peers, endpoint_id) + + defguard component_exists?(state, endpoint_id) when is_map_key(state.components, endpoint_id) + + defguard endpoint_exists?(state, endpoint_id) + when peer_exists?(state, endpoint_id) or component_exists?(state, endpoint_id) + + @spec new(id :: Room.id(), config :: Config.t()) :: t() + def new(id, config) do + rtc_engine_options = [ + id: id + ] + + {:ok, pid} = Engine.start_link(rtc_engine_options, []) + Engine.register(pid, self()) + + webrtc_config = Application.fetch_env!(:jellyfish, :webrtc_config) + + turn_options = + if webrtc_config[:webrtc_used?] do + turn_ip = webrtc_config[:turn_listen_ip] + turn_mock_ip = webrtc_config[:turn_ip] + + [ + ip: turn_ip, + mock_ip: turn_mock_ip, + ports_range: webrtc_config[:turn_port_range] + ] + else + [] + end + + tcp_turn_port = webrtc_config[:turn_tcp_port] + + if webrtc_config[:webrtc_used?] and tcp_turn_port != nil do + TURNManager.ensure_tcp_turn_launched(turn_options, port: tcp_turn_port) + end + + state = + %__MODULE__{ + id: id, + config: config, + engine_pid: pid, + network_options: [turn_options: turn_options] + } + |> maybe_schedule_peerless_purge() + + state + end + + @spec id(state :: t()) :: Room.id() + def id(state), do: state.id + + @spec engine_pid(state :: t()) :: pid() + def engine_pid(state), do: state.engine_pid + + @spec peerless_purge_timeout(state :: t()) :: Config.purge_timeout() + def peerless_purge_timeout(state), do: state.config.peerless_purge_timeout + + @spec peerless_long_enough?(state :: t()) :: boolean() + def peerless_long_enough?(%{config: config, peers: peers, last_peer_left: last_peer_left}) do + if all_peers_disconnected?(peers) do + Klotho.monotonic_time(:millisecond) >= last_peer_left + config.peerless_purge_timeout * 1000 + else + false + end + end + + @spec peer_disconnected_long_enough?(state :: t(), peer :: Peer.t()) :: boolean() + def peer_disconnected_long_enough?(_state, peer) when peer.status != :disconnected, do: false + + def peer_disconnected_long_enough?(state, peer) do + remove_timestamp = peer.last_time_connected + state.config.peer_disconnected_timeout * 1000 + + now = Klotho.monotonic_time(:millisecond) + + now >= remove_timestamp + end + + @spec put_peer(state :: t(), peer :: Peer.t()) :: t() + def put_peer(state, peer) do + state + |> put_in([:peers, peer.id], peer) + |> maybe_schedule_peer_purge(peer) + |> maybe_schedule_peerless_purge() + end + + @spec put_component(state :: t(), component :: Component.t()) :: t() + def put_component(state, component) do + put_in(state, [:components, component.id], component) + end + + @spec put_track(state :: t(), track_info :: TrackAdded.t()) :: t() + def put_track(state, %TrackAdded{endpoint_id: endpoint_id} = track_info) do + track = Track.from_track_message(track_info) + endpoint_id_type = get_endpoint_id_type(state, endpoint_id) + + Logger.info("Track #{track.id} added, #{endpoint_id_type}: #{endpoint_id}") + + Event.broadcast_server_notification( + {:track_added, state.id, {endpoint_id_type, endpoint_id}, track_info} + ) + + track = Track.from_track_message(track_info) + + endpoint_group = get_endpoint_group(state, endpoint_id) + access_path = [endpoint_group, endpoint_id, :tracks, track.id] + + put_in(state, access_path, track) + end + + @spec update_track(state :: t(), track_info :: TrackMetadataUpdated.t()) :: t() + def update_track(state, %TrackMetadataUpdated{endpoint_id: endpoint_id} = track_info) do + endpoint_group = get_endpoint_group(state, endpoint_id) + access_path = [endpoint_group, endpoint_id, :tracks, track_info.track_id] + + update_in(state, access_path, fn + %Track{} = track -> + endpoint_id_type = get_endpoint_id_type(state, endpoint_id) + updated_track = %Track{track | metadata: track_info.track_metadata} + + Logger.info( + "Track #{updated_track.id}, #{endpoint_id_type}: #{endpoint_id} - metadata updated: #{inspect(updated_track.metadata)}" + ) + + Event.broadcast_server_notification( + {:track_metadata_updated, state.id, {endpoint_id_type, endpoint_id}, updated_track} + ) + + updated_track + end) + end + + @spec remove_track(state :: t(), track_info :: TrackRemoved.t()) :: t() + def remove_track(state, %TrackRemoved{endpoint_id: endpoint_id} = track_info) do + endpoint_group = get_endpoint_group(state, endpoint_id) + access_path = [endpoint_group, endpoint_id, :tracks, track_info.track_id] + + {track, state} = pop_in(state, access_path) + + endpoint_id_type = get_endpoint_id_type(state, endpoint_id) + Logger.info("Track removed: #{track.id}, #{endpoint_id_type}: #{endpoint_id}") + + Event.broadcast_server_notification( + {:track_removed, state.id, {endpoint_id_type, endpoint_id}, track} + ) + + state + end + + @spec fetch_peer(state :: t(), peer_id :: Peer.id()) :: {:ok, Peer.t()} | :error + def fetch_peer(state, peer_id), do: Map.fetch(state.peers, peer_id) + + @spec fetch_component(state :: t(), component_id :: Component.id()) :: + {:ok, Component.t()} | :error + def fetch_component(state, component_id), do: Map.fetch(state.components, component_id) + + @spec update_peer_metadata(state :: t(), peer_id :: Peer.id(), metadata :: any()) :: t() + def update_peer_metadata(state, peer_id, metadata) do + Event.broadcast_server_notification({:peer_metadata_updated, state.id, peer_id, metadata}) + + put_in(state, [:peers, peer_id, :metadata], metadata) + end + + @spec connect_peer(state :: t(), peer :: Peer.t(), socket_pid :: pid()) :: t() + def connect_peer(state, peer, socket_pid) do + peer = %{peer | status: :connected, socket_pid: socket_pid} + + state = put_peer(state, peer) + + :ok = Engine.add_endpoint(state.engine_pid, peer.engine_endpoint, id: peer.id) + + Logger.info("Peer #{inspect(peer.id)} connected") + + :telemetry.execute([:jellyfish, :room], %{peer_connects: 1}, %{room_id: state.id}) + + state + end + + @spec disconnect_peer(state :: t(), peer_ws_pid :: pid()) :: t() + def disconnect_peer(state, peer_ws_pid) do + case find_peer_with_pid(state, peer_ws_pid) do + nil -> + state + + {peer_id, peer} -> + :ok = Engine.remove_endpoint(state.engine_pid, peer_id) + + Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) + :telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id}) + + peer.tracks + |> Map.values() + |> Enum.each( + &Event.broadcast_server_notification( + {:track_removed, state.id, {:peer_id, peer_id}, &1} + ) + ) + + peer = %{peer | status: :disconnected, socket_pid: nil, tracks: %{}} + + put_peer(state, peer) + end + end + + @spec remove_peer(state :: t(), peer_id :: Peer.id(), reason :: any()) :: t() + def remove_peer(state, peer_id, :timeout) do + {_peer, state} = pop_in(state, [:peers, peer_id]) + + maybe_schedule_peerless_purge(state) + end + + def remove_peer(state, peer_id, reason) do + {peer, state} = pop_in(state, [:peers, peer_id]) + :ok = Engine.remove_endpoint(state.engine_pid, peer_id) + + if is_pid(peer.socket_pid), + do: send(peer.socket_pid, {:stop_connection, reason}) + + peer.tracks + |> Map.values() + |> Enum.each( + &Event.broadcast_server_notification({:track_removed, state.id, {:peer_id, peer_id}, &1}) + ) + + Logger.info("Removed peer #{inspect(peer_id)} from room #{inspect(state.id)}") + + if peer.status == :connected and reason == :peer_removed do + Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) + :telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id}) + end + + with {:peer_crashed, crash_reason} <- reason do + Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason}) + :telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id}) + end + + maybe_schedule_peerless_purge(state) + end + + @spec remove_component(state :: t(), component_id :: Component.id(), reason :: reason_t()) :: + t() + def remove_component(state, component_id, reason) do + {component, state} = pop_in(state, [:components, component_id]) + :ok = Engine.remove_endpoint(state.engine_pid, component_id) + + component.tracks + |> Map.values() + |> Enum.each( + &Event.broadcast_server_notification( + {:track_removed, state.id, {:component_id, component_id}, &1} + ) + ) + + Logger.info("Removed component #{inspect(component_id)}: #{inspect(reason)}") + + component.type.on_remove(state, component) + + if reason == :component_crashed, + do: Event.broadcast_server_notification({:component_crashed, state.id, component_id}) + + state + end + + @spec remove_all_endpoints(state :: t()) :: :ok + def remove_all_endpoints(state) do + state.peers + |> Map.values() + |> Enum.each(&remove_peer(state, &1.id, :room_stopped)) + + state.components + |> Map.values() + |> Enum.each(&remove_component(state, &1.id, :room_stopped)) + end + + @spec find_peer_with_pid(state :: t(), pid :: pid()) :: {Peer.id(), Peer.t()} | nil + defp find_peer_with_pid(state, pid), + do: Enum.find(state.peers, fn {_id, peer} -> peer.socket_pid == pid end) + + @spec get_component_by_id(state :: t(), component_id :: Component.id()) :: Component.t() | nil + def get_component_by_id(state, component_id) do + Enum.find_value(state.components, fn {id, component} -> + if id == component_id, do: component + end) + end + + @spec set_hls_playable(state :: t()) :: t() + def set_hls_playable(state) do + endpoint_id = find_hls_component_id(state) + + Event.broadcast_server_notification({:hls_playable, state.id, endpoint_id}) + + update_in(state, [:components, endpoint_id, :properties], &Map.put(&1, :playable, true)) + end + + @spec find_hls_component_id(state :: t()) :: Component.t() | nil + def find_hls_component_id(state), + do: + Enum.find_value(state.components, fn {id, %{type: type}} -> + if type == HLS, do: id + end) + + @spec reached_peers_limit?(state :: t()) :: boolean() + def reached_peers_limit?(state), do: Enum.count(state.peers) == state.config.max_peers + + @spec generate_peer_options(state :: t(), override_options :: map()) :: map() + def generate_peer_options(state, override_options) do + Map.merge( + %{ + engine_pid: state.engine_pid, + network_options: state.network_options, + video_codec: state.config.video_codec, + room_id: state.id + }, + override_options + ) + end + + def check_component_allowed(type, %{ + config: %{video_codec: video_codec}, + components: components + }) + when type in [HLS, Recording] do + cond do + video_codec != :h264 -> + {:error, :incompatible_codec} + + component_already_present?(type, components) -> + {:error, :reached_components_limit} + + true -> + :ok + end + end + + def check_component_allowed(RTSP, %{config: %{video_codec: video_codec}}) do + # Right now, RTSP component can only publish H264, so there's no point adding it + # to a room which allows another video codec, e.g. VP8 + if video_codec == :h264, + do: :ok, + else: {:error, :incompatible_codec} + end + + def check_component_allowed(_component_type, _state), do: :ok + + @spec get_endpoint_id_type(state :: t(), endpoint_id :: endpoint_id()) :: + :peer_id | :component_id + def get_endpoint_id_type(state, endpoint_id) do + case get_endpoint_group(state, endpoint_id) do + :peers -> :peer_id + :components -> :component_id + end + end + + @spec get_endpoint_group(state :: t(), endpoint_id :: endpoint_id()) :: + :components | :peers + def get_endpoint_group(state, endpoint_id) when component_exists?(state, endpoint_id), + do: :components + + def get_endpoint_group(state, endpoint_id) when peer_exists?(state, endpoint_id), + do: :peers + + @spec validate_subscription_mode(component :: Component.t() | nil) :: :ok | {:error, any()} + def validate_subscription_mode(nil), do: {:error, :component_not_exists} + + def validate_subscription_mode(%{properties: %{subscribe_mode: :auto}}), + do: {:error, :invalid_subscribe_mode} + + def validate_subscription_mode(%{properties: %{subscribe_mode: :manual}}), do: :ok + def validate_subscription_mode(_not_properties), do: {:error, :invalid_component_type} + + defp component_already_present?(type, components), + do: components |> Map.values() |> Enum.any?(&(&1.type == type)) + + defp maybe_schedule_peerless_purge(%{config: %{peerless_purge_timeout: nil}} = state), do: state + + defp maybe_schedule_peerless_purge(%{config: config, peers: peers} = state) do + if all_peers_disconnected?(peers) do + last_peer_left = Klotho.monotonic_time(:millisecond) + + Klotho.send_after(config.peerless_purge_timeout * 1000, self(), :peerless_purge) + + %{state | last_peer_left: last_peer_left} + else + state + end + end + + defp maybe_schedule_peer_purge(%{config: %{peer_disconnected_timeout: nil}} = state, _peer), + do: state + + defp maybe_schedule_peer_purge(%{config: config} = state, peer) do + case fetch_peer(state, peer.id) do + {:ok, peer} when peer.status == :disconnected -> + last_time_connected = Klotho.monotonic_time(:millisecond) + + Klotho.send_after(config.peer_disconnected_timeout * 1000, self(), {:peer_purge, peer.id}) + + put_in(state, [:peers, peer.id, :last_time_connected], last_time_connected) + + _other -> + state + end + end + + defp all_peers_disconnected?(peers) do + peers |> Map.values() |> Enum.all?(&(&1.status == :disconnected)) + end +end diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 350e1c35..c8577c8c 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -205,6 +205,7 @@ defmodule Jellyfish.RoomService do Logger.debug("Room #{room_id} is down with reason: normal") Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_stopped) + Event.broadcast_server_notification({:room_deleted, room_id}) clear_room_metrics(room_id) {:noreply, state} @@ -256,8 +257,6 @@ defmodule Jellyfish.RoomService do try do :ok = GenServer.stop(room, :normal) Logger.info("Deleted room #{inspect(room_id)}") - - Event.broadcast_server_notification({:room_deleted, room_id}) catch :exit, {:noproc, {GenServer, :stop, [^room, :normal, :infinity]}} -> Logger.warning("Room process with id #{inspect(room_id)} doesn't exist") diff --git a/lib/jellyfish_web/api_spec/room.ex b/lib/jellyfish_web/api_spec/room.ex index 73027832..c9ab507f 100644 --- a/lib/jellyfish_web/api_spec/room.ex +++ b/lib/jellyfish_web/api_spec/room.ex @@ -46,6 +46,14 @@ defmodule JellyfishWeb.ApiSpec.Room do minimum: 1, example: 60, nullable: true + }, + peerDisconnectedTimeout: %Schema{ + description: + "Duration (in seconds) after which the peer will be removed if it is disconnected. If not provided, this feature is disabled.", + type: :integer, + minimum: 1, + example: 60, + nullable: true } } }) diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 9fe170e4..00e9e419 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -95,11 +95,10 @@ defmodule JellyfishWeb.RoomController do webhook_url = Map.get(params, "webhookUrl") {:error, :bad_request, "Expected webhookUrl to be valid URL, got: #{webhook_url}"} - {:error, :invalid_peerless_purge_timeout} -> - timeout = Map.get(params, "peerlessPurgeTimeout") + {:error, :invalid_purge_timeout, param} -> + timeout = Map.get(params, param) - {:error, :bad_request, - "Expected peerlessPurgeTimeout to be a positive integer, got: #{timeout}"} + {:error, :bad_request, "Expected #{param} to be a positive integer, got: #{timeout}"} {:error, :room_already_exists} -> room_id = Map.get(params, "roomId") diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index 54f0e9ee..1f1d605d 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -49,6 +49,7 @@ defmodule JellyfishWeb.PeerSocket do }) Event.broadcast_server_notification({:peer_connected, room_id, peer_id}) + Logger.metadata(room_id: room_id, peer_id: peer_id) {:reply, :ok, {:binary, encoded_message}, state} else @@ -115,32 +116,38 @@ defmodule JellyfishWeb.PeerSocket do @impl true def handle_info({:stop_connection, :peer_removed}, state) do + Logger.info("Peer socket stopped because peer removed") {:stop, :closed, {1000, "Peer removed"}, state} end @impl true def handle_info({:stop_connection, :room_stopped}, state) do + Logger.info("Peer socket stopped because room stopped") {:stop, :closed, {1000, "Room stopped"}, state} end @impl true def handle_info({:stop_connection, {:peer_crashed, crash_reason}}, state) when crash_reason != nil do + Logger.warning("Peer socket stopped because peer crashed with reason: #{crash_reason}") {:stop, :closed, {1011, crash_reason}, state} end @impl true def handle_info({:stop_connection, {:peer_crashed, _reason}}, state) do + Logger.warning("Peer socket stopped because peer crashed with unknown reason") {:stop, :closed, {1011, "Internal server error"}, state} end @impl true def handle_info(:room_crashed, state) do + Logger.warning("Peer socket stopped because room crashed") {:stop, :closed, {1011, "Internal server error"}, state} end @impl true - def terminate(_reason, _state) do + def terminate(reason, _state) do + Logger.info("Peer socket terminates with reason #{reason}") :ok end diff --git a/openapi.yaml b/openapi.yaml index 92296f89..42dbbcaa 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -787,6 +787,12 @@ components: minimum: 1 nullable: true type: integer + peerDisconnectedTimeout: + description: Duration (in seconds) after which the peer will be removed if it is disconnected. If not provided, this feature is disabled. + example: 60 + minimum: 1 + nullable: true + type: integer peerlessPurgeTimeout: description: Duration (in seconds) after which the room will be removed if no peers are connected. If not provided, this feature is disabled. example: 60 diff --git a/test/jellyfish/config_reader_test.exs b/test/jellyfish/config_reader_test.exs index 2e2093ef..4f71e3c4 100644 --- a/test/jellyfish/config_reader_test.exs +++ b/test/jellyfish/config_reader_test.exs @@ -181,7 +181,8 @@ defmodule Jellyfish.ConfigReaderTest do "JF_DIST_MODE", "JF_DIST_COOKIE", "JF_DIST_NODE_NAME", - "JF_DIST_NODES" + "JF_DIST_NODES", + "JF_DIST_POLLING_INTERVAL" ] do {:ok, hostname} = :inet.gethostname() @@ -237,7 +238,8 @@ defmodule Jellyfish.ConfigReaderTest do "JF_DIST_COOKIE", "JF_DIST_NODE_NAME", "JF_DIST_NODES", - "JF_DIST_STRATEGY_NAME" + "JF_DIST_STRATEGY_NAME", + "JF_DIST_POLLING_INTERVAL" ] do assert ConfigReader.read_dist_config() == [ enabled: false, diff --git a/test/jellyfish_web/controllers/room_controller_test.exs b/test/jellyfish_web/controllers/room_controller_test.exs index 42013320..55bc2503 100644 --- a/test/jellyfish_web/controllers/room_controller_test.exs +++ b/test/jellyfish_web/controllers/room_controller_test.exs @@ -175,6 +175,11 @@ defmodule JellyfishWeb.RoomControllerTest do assert json_response(conn, :bad_request)["errors"] == "Expected peerlessPurgeTimeout to be a positive integer, got: nan" + conn = post(conn, ~p"/room", peerDisconnectedTimeout: "nan") + + assert json_response(conn, :bad_request)["errors"] == + "Expected peerDisconnectedTimeout to be a positive integer, got: nan" + conn = post(conn, ~p"/room", roomId: "test/path") assert json_response(conn, :bad_request)["errors"] == @@ -306,6 +311,119 @@ defmodule JellyfishWeb.RoomControllerTest do end end + describe "peer disconnect purge" do + setup %{conn: conn} do + conn = post(conn, ~p"/room", peerDisconnectedTimeout: @purge_timeout_s) + assert %{"id" => id} = json_response(conn, :created)["data"]["room"] + %{conn: conn, id: id} + end + + test "happens if peer added, but not joined", %{conn: conn, id: id} do + conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc") + + assert %{"token" => _token, "peer" => %{"id" => _peer_id}} = + json_response(conn, :created)["data"] + + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + conn = get(conn, ~p"/room/#{id}") + assert json_response(conn, :ok)["data"]["peers"] |> Enum.empty?() + end + + test "happens if peer joined, then disconnected", %{conn: conn, id: id} do + conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc") + assert %{"token" => token} = json_response(conn, :created)["data"] + + ws = connect_peer(token) + + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + conn = get(conn, ~p"/room/#{id}") + assert response(conn, :ok) + + GenServer.stop(ws) + Process.sleep(10) + + conn = get(conn, ~p"/room/#{id}") + + assert %{"status" => "disconnected"} = + json_response(conn, :ok)["data"]["peers"] |> List.first() + + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + + conn = get(conn, ~p"/room/#{id}") + assert json_response(conn, :ok)["data"]["peers"] |> Enum.empty?() + end + + test "does not happen if peers rejoined quickly", %{conn: conn, id: id} do + conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc") + assert %{"token" => token} = json_response(conn, :created)["data"] + + ws = connect_peer(token) + + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + conn = get(conn, ~p"/room/#{id}") + assert response(conn, :ok) + + GenServer.stop(ws) + Process.sleep(10) + conn = get(conn, ~p"/room/#{id}") + + assert %{"status" => "disconnected"} = + json_response(conn, :ok)["data"]["peers"] |> List.first() + + Klotho.Mock.warp_by(@purge_timeout_ms |> div(2)) + conn = get(conn, ~p"/room/#{id}") + assert response(conn, :ok) + assert peers = json_response(conn, :ok)["data"]["peers"] + assert not Enum.empty?(peers) + + connect_peer(token) + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + conn = get(conn, ~p"/room/#{id}") + assert response(conn, :ok) + end + + test "does not happen when not configured", %{conn: conn} do + conn = post(conn, ~p"/room") + assert %{"id" => id} = json_response(conn, :created)["data"]["room"] + + conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc") + + assert %{"token" => _token, "peer" => %{"id" => _peer_id}} = + json_response(conn, :created)["data"] + + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + conn = get(conn, ~p"/room/#{id}") + peers = json_response(conn, :ok)["data"]["peers"] + + assert not Enum.empty?(peers) + end + + test "does work with peerlessPurgeTimeout", %{conn: conn} do + conn = + post(conn, ~p"/room", + peerlessPurgeTimeout: @purge_timeout_s * 2, + peerDisconnectedTimeout: @purge_timeout_s + ) + + assert %{"id" => id} = json_response(conn, :created)["data"]["room"] + + conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc") + + assert %{"token" => _token, "peer" => %{"id" => _peer_id}} = + json_response(conn, :created)["data"] + + Klotho.Mock.warp_by(@purge_timeout_ms + 10) + conn = get(conn, ~p"/room/#{id}") + peers = json_response(conn, :ok)["data"]["peers"] + + assert Enum.empty?(peers) + + Klotho.Mock.warp_by(@purge_timeout_ms * 2 + 10) + conn = get(conn, ~p"/room") + assert Enum.all?(json_response(conn, :ok)["data"], &(&1["id"] != id)) + end + end + describe "delete room" do setup [:create_room] diff --git a/test/jellyfish_web/integration/server_notification_test.exs b/test/jellyfish_web/integration/server_notification_test.exs index f83810ed..6fc28cbe 100644 --- a/test/jellyfish_web/integration/server_notification_test.exs +++ b/test/jellyfish_web/integration/server_notification_test.exs @@ -63,6 +63,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do password: "yourpassword" } + @purge_timeout_s 1 + @purge_timeout_ms @purge_timeout_s * 1000 + Application.put_env( :jellyfish, Endpoint, @@ -109,6 +112,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) + Klotho.Mock.reset() + Klotho.Mock.freeze() + on_exit(fn -> conn = get(conn, ~p"/room") rooms = json_response(conn, :ok)["data"] @@ -193,6 +199,37 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 end + test "sends a message when room gets created and deleted by peerless purge", %{conn: conn} do + server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) + ws = create_and_authenticate() + + subscribe(ws, :server_notification) + + conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) + + conn = + post(conn, ~p"/room", + maxPeers: 1, + webhookUrl: @webhook_url, + peerlessPurgeTimeout: @purge_timeout_s + ) + + assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] + + assert_receive %RoomCreated{room_id: ^room_id} + assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000 + + {peer_id, _token, _conn} = add_peer(conn, room_id) + + conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}") + assert response(conn, :no_content) + + Klotho.Mock.warp_by(@purge_timeout_ms + 25) + + assert_receive %RoomDeleted{room_id: ^room_id}, 1_000 + assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 + end + describe "WebRTC Peer" do test "sends a message when peer connects and room is deleted", %{conn: conn} do {room_id, peer_id, conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) @@ -359,6 +396,51 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do assert Enum.empty?(tracks) end + + test "sends a message when peer gets created and deleted by disconnected purge", %{conn: conn} do + server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) + ws = create_and_authenticate() + + subscribe(ws, :server_notification) + + conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) + + conn = + post(conn, ~p"/room", + maxPeers: 1, + webhookUrl: @webhook_url, + peerlessPurgeTimeout: @purge_timeout_s, + peerDisconnectedTimeout: @purge_timeout_s + ) + + assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] + + assert_receive %RoomCreated{room_id: ^room_id} + assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000 + + {peer_id, token, _conn} = add_peer(conn, room_id) + {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) + WS.send_auth_request(peer_ws, token) + + assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} + + assert_receive {:webhook_notification, + %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, + 1000 + + :ok = GenServer.stop(peer_ws) + + assert_receive %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} + + assert_receive {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}, + 1_000 + + Klotho.Mock.warp_by(@purge_timeout_ms * 3) + + assert_receive %RoomDeleted{room_id: ^room_id}, 1_000 + assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 + end end test "sends message when File adds or removes tracks", %{conn: conn} do