diff --git a/lib/membrane_udp/common_socket_behaviour.ex b/lib/membrane_udp/common_socket_behaviour.ex index 33ac1fd..3ec6e2c 100644 --- a/lib/membrane_udp/common_socket_behaviour.ex +++ b/lib/membrane_udp/common_socket_behaviour.ex @@ -13,8 +13,8 @@ defmodule Membrane.UDP.CommonSocketBehaviour do ) :: {[Membrane.Element.Action.common_actions() | Membrane.Element.Action.setup()], Membrane.Element.state()} - def handle_setup(ctx, %{local_socket: %Socket{} = local_socket} = state) do - case mockable(Socket).open(local_socket) do + def handle_setup(ctx, %{local_socket: %Socket{socket_handle: nil}} = state) do + case mockable(Socket).open(state.local_socket) do {:ok, socket} -> notification = {:connection_info, socket.ip_address, socket.port_no} @@ -31,6 +31,23 @@ defmodule Membrane.UDP.CommonSocketBehaviour do end end + def handle_setup(_ctx, state) do + {:ok, {socket_address, socket_port}} = :inet.sockname(state.local_socket.socket_handle) + + cond do + state.local_socket.ip_address not in [socket_address, :any] -> + raise "Local address passed in options not matching the one of the passed socket." + + state.local_socket.port_no not in [socket_port, 0] -> + raise "Local port passed in options not matching the one of the passed socket." + + true -> + :ok + end + + {[], state} + end + defp close_socket(%Socket{} = local_socket) do mockable(Socket).close(local_socket) end diff --git a/lib/membrane_udp/sink.ex b/lib/membrane_udp/sink.ex index 452f26f..db1cea8 100644 --- a/lib/membrane_udp/sink.ex +++ b/lib/membrane_udp/sink.ex @@ -32,6 +32,14 @@ defmodule Membrane.UDP.Sink do A UDP port number for the socket used to sent packets. If set to `0` (default) the underlying OS will assign a free UDP port. """ + ], + local_socket: [ + spec: :gen_udp.socket() | nil, + default: nil, + description: """ + Already opened UDP socket, if provided it will be used instead of creating + and opening a new one. + """ ] def_input_pad :input, accepted_format: _any @@ -44,7 +52,8 @@ defmodule Membrane.UDP.Sink do destination_address: dst_address, destination_port_no: dst_port_no, local_address: local_address, - local_port_no: local_port_no + local_port_no: local_port_no, + local_socket: local_socket } = options state = %{ @@ -54,7 +63,8 @@ defmodule Membrane.UDP.Sink do }, local_socket: %Socket{ ip_address: local_address, - port_no: local_port_no + port_no: local_port_no, + socket_handle: local_socket } } diff --git a/test/membrane_udp/sink/pipeline_test.exs b/test/membrane_udp/sink/pipeline_test.exs index b5b5cf2..835129d 100644 --- a/test/membrane_udp/sink/pipeline_test.exs +++ b/test/membrane_udp/sink/pipeline_test.exs @@ -13,39 +13,73 @@ defmodule Membrane.UDP.SinkPipelineTest do @values 1..100 defp setup_state(_ctx) do - open_local_socket = %Socket{ + dst_socket = %Socket{ port_no: @destination_port_no, ip_address: @local_address, sock_opts: [recbuf: 1024 * 1024] } - %{state: %{local_socket: open_local_socket}} + local_socket = %Socket{ + port_no: @local_port_no, + ip_address: @local_address, + sock_opts: [recbuf: 1024 * 1024] + } + + %{state: %{dst_socket: dst_socket, local_socket: local_socket}} end setup [:setup_state, :setup_socket_from_state] - @tag open_socket_from_state: [:local_socket] - test "100 messages passes through pipeline" do - data = @values |> Enum.map(&to_string(&1)) - - assert pipeline = - Pipeline.start_link_supervised!( - spec: [ - child(:test_source, %Source{output: data}) - |> child(:udp_sink, %Sink{ - destination_address: @local_address, - destination_port_no: @destination_port_no, - local_address: @local_address, - local_port_no: @local_port_no - }) - ] - ) - - Enum.each(@values, fn elem -> - expected_value = to_string(elem) - assert_receive {:udp, _, @local_address, @local_port_no, ^expected_value}, 1000 - end) - - Pipeline.terminate(pipeline) + describe "100 messages pass through a pipeline when the sink" do + @tag open_socket_from_state: [:dst_socket] + test "opens its own socket" do + data = @values |> Enum.map(&to_string(&1)) + + assert pipeline = + Pipeline.start_link_supervised!( + spec: [ + child(:test_source, %Source{output: data}) + |> child(:udp_sink, %Sink{ + destination_address: @local_address, + destination_port_no: @destination_port_no, + local_address: @local_address, + local_port_no: @local_port_no + }) + ] + ) + + Enum.each(@values, fn elem -> + expected_value = to_string(elem) + assert_receive {:udp, _, @local_address, @local_port_no, ^expected_value}, 1000 + end) + + Pipeline.terminate(pipeline) + end + + @tag open_socket_from_state: [:local_socket, :dst_socket] + test "gets an open socket via options", %{state: %{local_socket: local_socket}} do + data = @values |> Enum.map(&to_string(&1)) + + assert pipeline = + Pipeline.start_link_supervised!( + spec: [ + child(:test_source, %Source{output: data}) + |> child(:udp_sink, %Sink{ + destination_address: @local_address, + destination_port_no: @destination_port_no, + local_address: @local_address, + local_port_no: @local_port_no, + local_socket: local_socket.socket_handle + }) + ] + ) + + Enum.each(@values, fn elem -> + expected_value = to_string(elem) + assert_receive {:udp, _, @local_address, @local_port_no, ^expected_value}, 1000 + end) + + Pipeline.terminate(pipeline) + end end end