Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP Endpoint #28

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions lib/membrane_udp/endpoint.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
defmodule Membrane.UDP.Endpoint do
@moduledoc """
Element that sends buffers received on the input pad over a UDP socket and
reads packets from a UDP socket and sends their payloads through the output pad.
"""
use Membrane.Endpoint

import Mockery.Macro

alias Membrane.{Buffer, RemoteStream}
alias Membrane.UDP.{CommonSocketBehaviour, Socket}

def_options destination_address: [
spec: :inet.ip_address(),
description: "An IP Address that the packets will be sent to."
],
destination_port_no: [
spec: :inet.port_number(),
description: "A UDP port number of a target."
],
local_address: [
spec: :inet.socket_address(),
default: :any,
description: """
This address is used in two cases:
* An IP Address set for a UDP socket used to sent packets.
* An IP Address on which the socket will listen.
In both cases, it allows to choose which network interface to use if there's more than one.
"""
],
local_port_no: [
spec: :inet.port_number(),
default: 0,
description: """
A UDP port number used when opening a receiving socket and for sending packets.
"""
],
recv_buffer_size: [
spec: pos_integer(),
default: 1024 * 1024,
description: """
Size of the receive buffer. Packages of size greater than this buffer will be truncated
"""
]

def_input_pad :input,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
flow_control: :manual,
demand_unit: :buffers

Let's make it auto


def_output_pad :output, accepted_format: %RemoteStream{type: :packetized}, flow_control: :push

# Private API

@impl true
def handle_init(_context, %__MODULE__{} = opts) do
%__MODULE__{
destination_address: dst_address,
destination_port_no: dst_port_no,
local_address: local_address,
local_port_no: local_port_no
} = opts

state = %{
dst_socket: %Socket{
ip_address: dst_address,
port_no: dst_port_no
},
local_socket: %Socket{
ip_address: local_address,
port_no: local_port_no,
sock_opts: [recbuf: opts.recv_buffer_size]
}
}

{[], state}
end

@impl true
def handle_playing(_context, state) do
{[demand: :input, stream_format: {:output, %RemoteStream{type: :packetized}}], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload}, _context, state) do
%{dst_socket: dst_socket, local_socket: local_socket} = state

case mockable(Socket).send(dst_socket, local_socket, payload) do
:ok -> {[demand: :input], state}
{:error, cause} -> raise "Error sending UDP packet, reason: #{inspect(cause)}"
end
end

@impl true
def handle_parent_notification(
{:udp, _socket_handle, _addr, _port_no, _payload} = meta,
ctx,
state
) do
handle_info(meta, ctx, state)
end

@impl true
def handle_info(
{:udp, _socket_handle, address, port_no, payload},
%{playback: :playing},
state
) do
metadata =
Map.new()
|> Map.put(:udp_source_address, address)
|> Map.put(:udp_source_port, port_no)
|> Map.put(:arrival_ts, Membrane.Time.vm_time())

actions = [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}]

{actions, state}
end

@impl true
def handle_info(
{:udp, _socket_handle, _address, _port_no, _payload},
_ctx,
state
) do
{[], state}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour
end
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
%{
"bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"},
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.37", "2ad73550e27c8946648b06905a57e4d454e4d7229c2dafa72a0348c99d8be5f7", [:mix], [], "hexpm", "6b19783f2802f039806f375610faa22da130b8edc21209d0bff47918bb48360e"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
"membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"},
"mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"},
Expand Down
31 changes: 31 additions & 0 deletions test/membrane_udp/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,37 @@ defmodule Membrane.UDP.IntegrationTest do
Pipeline.terminate(receiver)
end

test "send and receive using 1 pipeline with endpoint" do
payload = 1..@payload_frames |> Enum.map(&inspect/1)

pipeline =
Pipeline.start_link_supervised!(
spec: [
child(:endpoint, %UDP.Endpoint{
local_port_no: @target_port,
local_address: @localhostv4,
destination_port_no: @target_port,
destination_address: @localhostv4
})
|> child(:sink, %Testing.Sink{}),
child(:source, %Testing.Source{output: payload})
|> get_child(:endpoint)
]
)

assert_pipeline_notified(pipeline, :endpoint, {:connection_info, @localhostv4, @target_port})

assert_end_of_stream(pipeline, :endpoint)

1..@payload_frames
|> Enum.each(fn x ->
payload = inspect(x)
assert_sink_buffer(pipeline, :sink, %Buffer{payload: ^payload})
end)

Pipeline.terminate(pipeline)
end

test "NAT pierce datagram comes through" do
{:ok, server_sock} =
UDP.Socket.open(%UDP.Socket{port_no: @server_port, ip_address: @localhostv4})
Expand Down
14 changes: 8 additions & 6 deletions test/membrane_udp/sink/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Membrane.UDP.SinkIntegrationTest do
import SocketSetup

alias Membrane.Buffer
alias Membrane.UDP.{Sink, SocketFactory}
alias Membrane.UDP.{Endpoint, Sink, SocketFactory}

@destination_port_no 5001
@local_port_no 5000
Expand All @@ -19,12 +19,14 @@ defmodule Membrane.UDP.SinkIntegrationTest do

setup [:setup_state, :setup_socket_from_state]

@tag open_socket_from_state: [:dst_socket, :local_socket]
test "Sends udp packet", %{state: state} do
payload = "A lot of laughs"
for module <- [Endpoint, Sink] do
@tag open_socket_from_state: [:dst_socket, :local_socket]
test "Sends udp packet through #{inspect(module)}", %{state: state} do
payload = "A lot of laughs"

Sink.handle_buffer(:input, %Buffer{payload: payload}, nil, state)
unquote(module).handle_buffer(:input, %Buffer{payload: payload}, nil, state)

assert_receive {:udp, _, @local_address, @local_port_no, ^payload}
assert_receive {:udp, _, @local_address, @local_port_no, ^payload}
end
end
end
38 changes: 20 additions & 18 deletions test/membrane_udp/sink/unit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,32 @@ defmodule Membrane.UDP.SinkUnitTest do
use Mockery

alias Membrane.Buffer
alias Membrane.UDP.{Sink, Socket, SocketFactory}
alias Membrane.UDP.{Endpoint, Sink, Socket, SocketFactory}

describe "Sink element" do
test "handle_buffer/4 calls send and demands more data" do
mock(Socket, [send: 3], :ok)
payload_data = "binary data"
local_socket = SocketFactory.local_socket(1234)
dst_socket = SocketFactory.local_socket(4321)
for module <- [Endpoint, Sink] do
describe "#{inspect(module)} element" do
test "handle_buffer/4 calls send and demands more data" do
mock(Socket, [send: 3], :ok)
payload_data = "binary data"
local_socket = SocketFactory.local_socket(1234)
dst_socket = SocketFactory.local_socket(4321)

state = %{
local_socket: local_socket,
dst_socket: dst_socket
}
state = %{
local_socket: local_socket,
dst_socket: dst_socket
}

assert Sink.handle_buffer(:input, %Buffer{payload: payload_data}, nil, state) ==
{[demand: :input], state}
assert unquote(module).handle_buffer(:input, %Buffer{payload: payload_data}, nil, state) ==
{[demand: :input], state}

assert_called(Socket, :send, [^dst_socket, ^local_socket, ^payload_data])
end
assert_called(Socket, :send, [^dst_socket, ^local_socket, ^payload_data])
end

test "demands data when starting to play" do
assert {commands, nil} = Sink.handle_playing(nil, nil)
test "demands data when starting to play" do
assert {commands, nil} = unquote(module).handle_playing(nil, nil)

assert Keyword.fetch(commands, :demand) == {:ok, :input}
assert Keyword.fetch(commands, :demand) == {:ok, :input}
end
end
end
end
40 changes: 21 additions & 19 deletions test/membrane_udp/source/unit_test.exs
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
defmodule Membrane.UDP.SourceTest do
use ExUnit.Case

alias Membrane.UDP.Source
alias Membrane.UDP.{Endpoint, Source}

test "parses udp message" do
example_binary_payload = "Hi there, I am binary"
sender_port = 6666
sender_address = {192, 168, 0, 1}
state = :unchanged
message = {:udp, 5000, sender_address, sender_port, example_binary_payload}
for module <- [Endpoint, Source] do
test "parses udp message #{inspect(module)} element" do
example_binary_payload = "Hi there, I am binary"
sender_port = 6666
sender_address = {192, 168, 0, 1}
state = :unchanged
message = {:udp, 5000, sender_address, sender_port, example_binary_payload}

assert {actions, ^state} =
Source.handle_parent_notification(message, %{playback: :playing}, state)
assert {actions, ^state} =
unquote(module).handle_parent_notification(message, %{playback: :playing}, state)

assert {:output, buffer} = Keyword.get(actions, :buffer)
assert {:output, buffer} = Keyword.get(actions, :buffer)

assert %Membrane.Buffer{
payload: ^example_binary_payload,
metadata: %{
udp_source_address: ^sender_address,
udp_source_port: ^sender_port,
arrival_ts: arrival_ts
}
} = buffer
assert %Membrane.Buffer{
payload: ^example_binary_payload,
metadata: %{
udp_source_address: ^sender_address,
udp_source_port: ^sender_port,
arrival_ts: arrival_ts
}
} = buffer

assert_in_delta(arrival_ts, Membrane.Time.vm_time(), 2 |> Membrane.Time.milliseconds())
assert_in_delta(arrival_ts, Membrane.Time.vm_time(), 2 |> Membrane.Time.milliseconds())
end
end
end
3 changes: 2 additions & 1 deletion test/support/factory/socket_factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule Membrane.UDP.SocketFactory do
def local_socket(port),
do: %Socket{
port_no: port,
ip_address: @local
ip_address: @local,
sock_opts: [recbuf: 1024 * 1024]
}

@spec local_address() :: :inet.socket_address()
Expand Down