Skip to content

Commit

Permalink
Add an option to pass a socket to the sink (#33)
Browse files Browse the repository at this point in the history
* Allow to pass an open socket to the sink
  • Loading branch information
Noarkhh authored Aug 26, 2024
1 parent f2fd0e6 commit 6b72e34
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 29 deletions.
21 changes: 19 additions & 2 deletions lib/membrane_udp/common_socket_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ defmodule Membrane.UDP.CommonSocketBehaviour do
) ::
{[Membrane.Element.Action.common_actions() | Membrane.Element.Action.setup()],
Membrane.Element.state()}
def handle_setup(ctx, %{local_socket: %Socket{} = local_socket} = state) do
case mockable(Socket).open(local_socket) do
def handle_setup(ctx, %{local_socket: %Socket{socket_handle: nil}} = state) do
case mockable(Socket).open(state.local_socket) do
{:ok, socket} ->
notification = {:connection_info, socket.ip_address, socket.port_no}

Expand All @@ -31,6 +31,23 @@ defmodule Membrane.UDP.CommonSocketBehaviour do
end
end

def handle_setup(_ctx, state) do
{:ok, {socket_address, socket_port}} = :inet.sockname(state.local_socket.socket_handle)

cond do
state.local_socket.ip_address not in [socket_address, :any] ->
raise "Local address passed in options not matching the one of the passed socket."

state.local_socket.port_no not in [socket_port, 0] ->
raise "Local port passed in options not matching the one of the passed socket."

true ->
:ok
end

{[], state}
end

defp close_socket(%Socket{} = local_socket) do
mockable(Socket).close(local_socket)
end
Expand Down
14 changes: 12 additions & 2 deletions lib/membrane_udp/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ defmodule Membrane.UDP.Sink do
A UDP port number for the socket used to sent packets. If set to `0` (default)
the underlying OS will assign a free UDP port.
"""
],
local_socket: [
spec: :gen_udp.socket() | nil,
default: nil,
description: """
Already opened UDP socket, if provided it will be used instead of creating
and opening a new one.
"""
]

def_input_pad :input, accepted_format: _any
Expand All @@ -44,7 +52,8 @@ defmodule Membrane.UDP.Sink do
destination_address: dst_address,
destination_port_no: dst_port_no,
local_address: local_address,
local_port_no: local_port_no
local_port_no: local_port_no,
local_socket: local_socket
} = options

state = %{
Expand All @@ -54,7 +63,8 @@ defmodule Membrane.UDP.Sink do
},
local_socket: %Socket{
ip_address: local_address,
port_no: local_port_no
port_no: local_port_no,
socket_handle: local_socket
}
}

Expand Down
84 changes: 59 additions & 25 deletions test/membrane_udp/sink/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,73 @@ defmodule Membrane.UDP.SinkPipelineTest do
@values 1..100

defp setup_state(_ctx) do
open_local_socket = %Socket{
dst_socket = %Socket{
port_no: @destination_port_no,
ip_address: @local_address,
sock_opts: [recbuf: 1024 * 1024]
}

%{state: %{local_socket: open_local_socket}}
local_socket = %Socket{
port_no: @local_port_no,
ip_address: @local_address,
sock_opts: [recbuf: 1024 * 1024]
}

%{state: %{dst_socket: dst_socket, local_socket: local_socket}}
end

setup [:setup_state, :setup_socket_from_state]

@tag open_socket_from_state: [:local_socket]
test "100 messages passes through pipeline" do
data = @values |> Enum.map(&to_string(&1))

assert pipeline =
Pipeline.start_link_supervised!(
spec: [
child(:test_source, %Source{output: data})
|> child(:udp_sink, %Sink{
destination_address: @local_address,
destination_port_no: @destination_port_no,
local_address: @local_address,
local_port_no: @local_port_no
})
]
)

Enum.each(@values, fn elem ->
expected_value = to_string(elem)
assert_receive {:udp, _, @local_address, @local_port_no, ^expected_value}, 1000
end)

Pipeline.terminate(pipeline)
describe "100 messages pass through a pipeline when the sink" do
@tag open_socket_from_state: [:dst_socket]
test "opens its own socket" do
data = @values |> Enum.map(&to_string(&1))

assert pipeline =
Pipeline.start_link_supervised!(
spec: [
child(:test_source, %Source{output: data})
|> child(:udp_sink, %Sink{
destination_address: @local_address,
destination_port_no: @destination_port_no,
local_address: @local_address,
local_port_no: @local_port_no
})
]
)

Enum.each(@values, fn elem ->
expected_value = to_string(elem)
assert_receive {:udp, _, @local_address, @local_port_no, ^expected_value}, 1000
end)

Pipeline.terminate(pipeline)
end

@tag open_socket_from_state: [:local_socket, :dst_socket]
test "gets an open socket via options", %{state: %{local_socket: local_socket}} do
data = @values |> Enum.map(&to_string(&1))

assert pipeline =
Pipeline.start_link_supervised!(
spec: [
child(:test_source, %Source{output: data})
|> child(:udp_sink, %Sink{
destination_address: @local_address,
destination_port_no: @destination_port_no,
local_address: @local_address,
local_port_no: @local_port_no,
local_socket: local_socket.socket_handle
})
]
)

Enum.each(@values, fn elem ->
expected_value = to_string(elem)
assert_receive {:udp, _, @local_address, @local_port_no, ^expected_value}, 1000
end)

Pipeline.terminate(pipeline)
end
end
end

0 comments on commit 6b72e34

Please sign in to comment.