Skip to content

Commit

Permalink
Refactor diamond detection messages
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Dec 4, 2024
1 parent 1f94101 commit 090488a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 60 deletions.
43 changes: 2 additions & 41 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -282,47 +282,8 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection), state) do
:ok = DiamondDetectionController.start_diamond_detection(state)
{:noreply, state}
end

defp do_handle_info(
Message.new(:diamond_detection, [
input_pad_ref,
diamond_detection_ref,
diamond_detection_path
]),
state
) do
state =
DiamondDetectionController.continue_diamond_detection(
input_pad_ref,
diamond_detection_ref,
diamond_detection_path,
state
)

{:noreply, state}
end

defp do_handle_info(Message.new(:delete_diamond_detection_ref, diamond_detection_ref), state) do
state = DiamondDetectionController.delete_diamond_detection_ref(diamond_detection_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:start_diamond_detection_trigger, trigger_ref), state) do
state = DiamondDetectionController.start_diamond_detection_trigger(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:diamond_detection_trigger, trigger_ref), state) do
state = DiamondDetectionController.handle_diamond_detection_trigger(trigger_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:delete_diamond_detection_trigger_ref, trigger_ref), state) do
state = DiamondDetectionController.delete_diamond_detection_trigger_ref(trigger_ref, state)
defp do_handle_info( Message.new(:diamond_detection, message), state ) do
state = DiamondDetectionController.handle_diamond_detection_message(message, state)
{:noreply, state}
end

Expand Down
76 changes: 58 additions & 18 deletions lib/membrane/core/element/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,45 @@ defmodule Membrane.Core.Element.DiamondDetectionController do

@component_path_prefix "__membrane_component_path_64_byte_prefix________________________"

@type diamond_detection_message() :: %{
:type =>
:start
| :start_trigger
| :diamond_detection
| :trigger
| :delete_ref
| :delete_trigger_ref,
optional(:ref) => reference(),
optional(:path) => PathInGraph.t(),
optional(:pad_ref) => Pad.ref()
}

@spec handle_diamond_detection_message(diamond_detection_message(), State.t()) :: State.t()
def handle_diamond_detection_message(%{type: type} = message, state) do
case type do
:start ->
:ok = start_diamond_detection(state)
state

:start_trigger ->
start_diamond_detection_trigger(message.ref, state)

:diamond_detection ->
continue_diamond_detection(message.pad_ref, message.ref, message.path, state)

:trigger ->
handle_diamond_detection_trigger(message.ref, state)

:delete_ref ->
delete_diamond_detection_ref(message.ref, state)

:delete_trigger_ref ->
delete_diamond_detection_trigger_ref(message.ref, state)
end
end

@spec start_diamond_detection(State.t()) :: :ok
def start_diamond_detection(state) do
defp start_diamond_detection(state) do
diamond_detection_path = [
%PathInGraph.Vertex{pid: self(), component_path: get_component_path()}
]
Expand All @@ -24,7 +61,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do

@spec continue_diamond_detection(Pad.ref(), reference(), PathInGraph.t(), State.t()) ::
State.t()
def continue_diamond_detection(
defp continue_diamond_detection(
input_pad_ref,
diamond_detection_ref,
diamond_detecton_path,
Expand All @@ -43,7 +80,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
:ok = forward_diamond_detection(diamond_detection_ref, diamond_detecton_path, state)

:ok =
Message.new(:delete_diamond_detection_ref, diamond_detection_ref)
%{type: :delete_ref, ref: diamond_detection_ref}
|> send_after_to_self()

state
Expand Down Expand Up @@ -76,7 +113,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end

@spec delete_diamond_detection_ref(reference(), State.t()) :: State.t()
def delete_diamond_detection_ref(diamond_detection_ref, state) do
defp delete_diamond_detection_ref(diamond_detection_ref, state) do
{_path, %State{} = state} =
state
|> pop_in([:diamond_detection, :ref_to_path, diamond_detection_ref])
Expand All @@ -95,11 +132,14 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
current_entry = %{current_entry | output_pad_ref: pad_ref}
diamond_detection_path = [current_entry | diamond_detection_path_tail]

Message.send(
pad_data.pid,
:diamond_detection,
[pad_data.other_ref, diamond_detection_ref, diamond_detection_path]
)
message = %{
type: :diamond_detection,
pad_ref: pad_data.other_ref,
ref: diamond_detection_ref,
path: diamond_detection_path
}

Message.send(pad_data.pid, :diamond_detection, message)
end
end)
end
Expand All @@ -108,7 +148,8 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
state.pads_data
|> Enum.each(fn {_pad_ref, %PadData{} = pad_data} ->
if pad_data.direction == :input and pad_data.flow_control != :push do
Message.send(pad_data.pid, :diamond_detection_trigger, trigger_ref)
message = %{type: :trigger, ref: trigger_ref}
Message.send(pad_data.pid, :diamond_detection, message)
end
end)
end
Expand All @@ -125,7 +166,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end

@spec start_diamond_detection_trigger(reference(), State.t()) :: State.t()
def start_diamond_detection_trigger(spec_ref, state) do
defp start_diamond_detection_trigger(spec_ref, state) do
if map_size(state.pads_data) < 2 or
MapSet.member?(state.diamond_detection.trigger_refs, spec_ref) do
state
Expand All @@ -135,7 +176,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end

@spec handle_diamond_detection_trigger(reference(), State.t()) :: State.t()
def handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
defp handle_diamond_detection_trigger(trigger_ref, %State{} = state) do
if state.type == :endpoint or
MapSet.member?(state.diamond_detection.trigger_refs, trigger_ref),
do: state,
Expand All @@ -151,7 +192,7 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
)

:ok =
Message.new(:delete_diamond_detection_trigger_ref, trigger_ref)
%{type: :delete_trigger_ref, ref: trigger_ref}
|> send_after_to_self()

:ok = forward_diamond_detection_trigger(trigger_ref, state)
Expand All @@ -166,16 +207,14 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
end

defp postpone_diamond_detection(%State{} = state) do
:ok =
Message.new(:start_diamond_detection)
|> send_after_to_self(1)
:ok = %{type: :start} |> send_after_to_self(1)

state
|> put_in([:diamond_detection, :postponed?], true)
end

@spec delete_diamond_detection_trigger_ref(reference(), State.t()) :: State.t()
def delete_diamond_detection_trigger_ref(trigger_ref, state) do
defp delete_diamond_detection_trigger_ref(trigger_ref, state) do
state
|> update_in(
[:diamond_detection, :trigger_refs],
Expand All @@ -190,8 +229,9 @@ defmodule Membrane.Core.Element.DiamondDetectionController do
|> Enum.count(fn {_pad_ref, pad_data} -> output_pull_pad?(pad_data, auto_pull_mode?) end)
end

defp send_after_to_self(message, seconds \\ 10) do
defp send_after_to_self(%{type: _type} = message, seconds \\ 10) do
send_after_time = Membrane.Time.seconds(seconds) |> Membrane.Time.as_milliseconds(:round)
message = Message.new(:diamond_detection, message)
self() |> Process.send_after(message, send_after_time)
:ok
end
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/parent/diamond_detection_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule Membrane.Core.Parent.DiamondDetectionController do
@spec start_diamond_detection_trigger(Child.name(), reference(), Parent.state()) :: :ok
def start_diamond_detection_trigger(child_name, trigger_ref, state) do
with %{component_type: :element, pid: pid} <- state.children[child_name] do
Message.send(pid, :start_diamond_detection_trigger, trigger_ref)
message = %{type: :start_trigger, ref: trigger_ref}
Message.send(pid, :diamond_detection, message)
end

:ok
Expand Down

0 comments on commit 090488a

Please sign in to comment.