Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP Endpoint #28

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions lib/membrane_udp/endpoint.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
defmodule Membrane.UDP.Endpoint do
@moduledoc """
Element that sends buffers received on the input pad over a UDP socket and
reads packets from a UDP socket and sends their payloads through the output pad.
"""
use Membrane.Endpoint

import Mockery.Macro

alias Membrane.{Buffer, RemoteStream}
alias Membrane.UDP.{CommonSocketBehaviour, Socket}

def_options destination_address: [
spec: :inet.ip_address(),
description: "An IP Address that the packets will be sent to."
],
destination_port_no: [
spec: :inet.port_number(),
description: "A UDP port number of a target."
],
local_address: [
spec: :inet.socket_address(),
default: :any,
description: """
This address is used in two cases:
* An IP Address set for a UDP socket used to sent packets.
* An IP Address on which the socket will listen.
In both cases, it allows to choose which network interface to use if there's more than one.
"""
],
local_port_no: [
spec: :inet.port_number(),
default: 0,
description: """
A UDP port number used when opening a receiving socket and for sending packets.
"""
],
recv_buffer_size: [
spec: pos_integer(),
default: 1024 * 1024,
description: """
Size of the receive buffer. Packages of size greater than this buffer will be truncated
"""
]

def_input_pad :input, accepted_format: _any

def_output_pad :output, accepted_format: %RemoteStream{type: :packetized}, flow_control: :push

# Private API

@impl true
def handle_init(_context, %__MODULE__{} = opts) do
%__MODULE__{
destination_address: dst_address,
destination_port_no: dst_port_no,
local_address: local_address,
local_port_no: local_port_no
} = opts

state = %{
dst_socket: %Socket{
ip_address: dst_address,
port_no: dst_port_no
},
local_socket: %Socket{
ip_address: local_address,
port_no: local_port_no,
sock_opts: [recbuf: opts.recv_buffer_size]
}
}

{[], state}
end

@impl true
def handle_playing(_context, state) do
{[stream_format: {:output, %RemoteStream{type: :packetized}}], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload}, _context, state) do
%{dst_socket: dst_socket, local_socket: local_socket} = state

case mockable(Socket).send(dst_socket, local_socket, payload) do
:ok -> {[], state}
{:error, cause} -> raise "Error sending UDP packet, reason: #{inspect(cause)}"
end
end

@impl true
def handle_parent_notification(
{:udp, _socket_handle, _addr, _port_no, _payload} = meta,
ctx,
state
) do
handle_info(meta, ctx, state)
end

@impl true
def handle_info(
{:udp, _socket_handle, address, port_no, payload},
%{playback: :playing},
state
) do
metadata =
Map.new()
|> Map.put(:udp_source_address, address)
|> Map.put(:udp_source_port, port_no)
|> Map.put(:arrival_ts, Membrane.Time.vm_time())

actions = [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}]

{actions, state}
end

@impl true
def handle_info(
{:udp, _socket_handle, _address, _port_no, _payload},
_ctx,
state
) do
{[], state}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour
end
9 changes: 3 additions & 6 deletions lib/membrane_udp/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ defmodule Membrane.UDP.Sink do
"""
]

def_input_pad :input,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers
def_input_pad :input, accepted_format: _any

# Private API

Expand Down Expand Up @@ -66,15 +63,15 @@ defmodule Membrane.UDP.Sink do

@impl true
def handle_playing(_context, state) do
{[demand: :input], state}
{[], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload}, _context, state) do
%{dst_socket: dst_socket, local_socket: local_socket} = state

case mockable(Socket).send(dst_socket, local_socket, payload) do
:ok -> {[demand: :input], state}
:ok -> {[], state}
{:error, cause} -> raise "Error sending UDP packet, reason: #{inspect(cause)}"
end
end
Expand Down
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
%{
"bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"},
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.37", "2ad73550e27c8946648b06905a57e4d454e4d7229c2dafa72a0348c99d8be5f7", [:mix], [], "hexpm", "6b19783f2802f039806f375610faa22da130b8edc21209d0bff47918bb48360e"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
"membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"},
"mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"},
Expand Down
4 changes: 2 additions & 2 deletions test/membrane_udp/common_behaviour_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ defmodule Membrane.UDP.CommonBehaviourTest do

import Membrane.Testing.Assertions

alias Membrane.UDP.{CommonSocketBehaviour, Socket, SocketFactory}
alias Membrane.UDP.{CommonSocketBehaviour, Socket}

describe "CommonBehaviour" do
test "opens and close socket when transitioning through states" do
# socket up
socket = SocketFactory.local_socket(123)
socket = %Socket{port_no: 123, ip_address: {127, 0, 0, 1}}
guard = Membrane.Testing.MockResourceGuard.start_link_supervised!()

mock(Socket, [open: 1], fn socket ->
Expand Down
31 changes: 31 additions & 0 deletions test/membrane_udp/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,37 @@ defmodule Membrane.UDP.IntegrationTest do
Pipeline.terminate(receiver)
end

test "send and receive using 1 pipeline with endpoint" do
payload = 1..@payload_frames |> Enum.map(&inspect/1)

pipeline =
Pipeline.start_link_supervised!(
spec: [
child(:endpoint, %UDP.Endpoint{
local_port_no: @target_port,
local_address: @localhostv4,
destination_port_no: @target_port,
destination_address: @localhostv4
})
|> child(:sink, %Testing.Sink{}),
child(:source, %Testing.Source{output: payload})
|> get_child(:endpoint)
]
)

assert_pipeline_notified(pipeline, :endpoint, {:connection_info, @localhostv4, @target_port})

assert_end_of_stream(pipeline, :endpoint)

1..@payload_frames
|> Enum.each(fn x ->
payload = inspect(x)
assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^payload})
end)

Pipeline.terminate(pipeline)
end

test "NAT pierce datagram comes through" do
{:ok, server_sock} =
UDP.Socket.open(%UDP.Socket{port_no: @server_port, ip_address: @localhostv4})
Expand Down
20 changes: 11 additions & 9 deletions test/membrane_udp/sink/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,29 @@ defmodule Membrane.UDP.SinkIntegrationTest do
import SocketSetup

alias Membrane.Buffer
alias Membrane.UDP.{Sink, SocketFactory}
alias Membrane.UDP.{Endpoint, Sink, Socket}

@destination_port_no 5001
@local_port_no 5000
@local_address SocketFactory.local_address()
@local_address {127, 0, 0, 1}

defp setup_state(_ctx) do
dst_socket = SocketFactory.local_socket(@destination_port_no)
local_socket = SocketFactory.local_socket(@local_port_no)
dst_socket = %Socket{port_no: @destination_port_no, ip_address: @local_address}
local_socket = %Socket{port_no: @local_port_no, ip_address: @local_address}

%{state: %{dst_socket: dst_socket, local_socket: local_socket}}
end

setup [:setup_state, :setup_socket_from_state]

@tag open_socket_from_state: [:dst_socket, :local_socket]
test "Sends udp packet", %{state: state} do
payload = "A lot of laughs"
for module <- [Endpoint, Sink] do
@tag open_socket_from_state: [:dst_socket, :local_socket]
test "Sends udp packet through #{inspect(module)}", %{state: state} do
payload = "A lot of laughs"

Sink.handle_buffer(:input, %Buffer{payload: payload}, nil, state)
unquote(module).handle_buffer(:input, %Buffer{payload: payload}, nil, state)

assert_receive {:udp, _, @local_address, @local_port_no, ^payload}
assert_receive {:udp, _, @local_address, @local_port_no, ^payload}
end
end
end
10 changes: 5 additions & 5 deletions test/membrane_udp/sink/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ defmodule Membrane.UDP.SinkPipelineTest do
import SocketSetup
import Membrane.ChildrenSpec

alias Membrane.UDP.{Sink, SocketFactory}
alias Membrane.UDP.{Sink, Socket}
alias Membrane.Testing.{Pipeline, Source}

@local_address SocketFactory.local_address()
@local_address {127, 0, 0, 1}
@local_port_no 5051
@destination_port_no 5015
@values 1..100

defp setup_state(_ctx) do
open_local_socket = SocketFactory.local_socket(@destination_port_no)
open_local_socket = %Socket{port_no: @destination_port_no, ip_address: @local_address}

%{state: %{local_socket: open_local_socket}}
end
Expand All @@ -29,9 +29,9 @@ defmodule Membrane.UDP.SinkPipelineTest do
spec: [
child(:test_source, %Source{output: data})
|> child(:udp_sink, %Sink{
destination_address: SocketFactory.local_address(),
destination_address: @local_address,
destination_port_no: @destination_port_no,
local_address: SocketFactory.local_address(),
local_address: @local_address,
local_port_no: @local_port_no
})
]
Expand Down
36 changes: 17 additions & 19 deletions test/membrane_udp/sink/unit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ defmodule Membrane.UDP.SinkUnitTest do
use Mockery

alias Membrane.Buffer
alias Membrane.UDP.{Sink, Socket, SocketFactory}
alias Membrane.UDP.{Endpoint, Sink, Socket}

describe "Sink element" do
test "handle_buffer/4 calls send and demands more data" do
mock(Socket, [send: 3], :ok)
payload_data = "binary data"
local_socket = SocketFactory.local_socket(1234)
dst_socket = SocketFactory.local_socket(4321)
@local_address {127, 0, 0, 1}

state = %{
local_socket: local_socket,
dst_socket: dst_socket
}
for module <- [Endpoint, Sink] do
describe "#{inspect(module)} element" do
test "handle_buffer/4 calls send and demands more data" do
mock(Socket, [send: 3], :ok)
payload_data = "binary data"
local_socket = %Socket{port_no: 1234, ip_address: @local_address}
dst_socket = %Socket{port_no: 4321, ip_address: @local_address}

assert Sink.handle_buffer(:input, %Buffer{payload: payload_data}, nil, state) ==
{[demand: :input], state}
state = %{
local_socket: local_socket,
dst_socket: dst_socket
}

assert_called(Socket, :send, [^dst_socket, ^local_socket, ^payload_data])
end

test "demands data when starting to play" do
assert {commands, nil} = Sink.handle_playing(nil, nil)
assert unquote(module).handle_buffer(:input, %Buffer{payload: payload_data}, nil, state) ==
{[], state}

assert Keyword.fetch(commands, :demand) == {:ok, :input}
assert_called(Socket, :send, [^dst_socket, ^local_socket, ^payload_data])
end
end
end
end
4 changes: 2 additions & 2 deletions test/membrane_udp/source/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Membrane.UDP.SourcePipelineTest do
import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.UDP.{SocketFactory, Source}
alias Membrane.UDP.Source
alias Membrane.Testing.{Pipeline, Sink}

@local_address {127, 0, 0, 1}
Expand All @@ -20,7 +20,7 @@ defmodule Membrane.UDP.SourcePipelineTest do
Pipeline.start_link_supervised!(
spec: [
child(:udp_source, %Source{
local_address: SocketFactory.local_address(),
local_address: @local_address,
local_port_no: @destination_port_no
})
|> child(:sink, %Sink{})
Expand Down
Loading