Skip to content

Commit

Permalink
Adapt UDP plugin to TCP (#1)
Browse files Browse the repository at this point in the history
* Copy funtionality of UDP plugin (WIP)

* Adapt source and common behaviour to TCP

* Start working on tests

* Add tests

* make tests pass

* Change stream format, adjust tests

* Adjust integration test

* Make readme TODO

* Update tests

* Refactor connection_side

* Refactor socket test
  • Loading branch information
Noarkhh authored Jan 16, 2024
1 parent 8099538 commit 87966aa
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 61 deletions.
30 changes: 9 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
# Membrane Template Plugin
# Membrane TCP plugin

[![Hex.pm](https://img.shields.io/hexpm/v/membrane_template_plugin.svg)](https://hex.pm/packages/membrane_template_plugin)
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_template_plugin)
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_template_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_template_plugin)
[![Hex.pm](https://img.shields.io/hexpm/v/membrane_tcp_plugin.svg)](https://hex.pm/packages/membrane_tcp_plugin)
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_tcp_plugin/)
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_tcp_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_tcp_plugin)

This repository contains a template for new plugins.

Check out different branches for other flavors of this template.

It's a part of the [Membrane Framework](https://membrane.stream).
This package provides TCP Source and Sink, that read and write to TCP sockets.

## Installation

The package can be installed by adding `membrane_template_plugin` to your list of dependencies in `mix.exs`:
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.

```elixir
def deps do
[
{:membrane_template_plugin, "~> 0.1.0"}
]
end
{:membrane_tcp_plugin, "~> 0.1.0"}
```

## Usage

TODO

## Copyright and License

Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin)
Copyright 2023, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane)

[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin)
[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane)

Licensed under the [Apache License, Version 2.0](LICENSE)
62 changes: 62 additions & 0 deletions lib/membrane_tcp/common_socket_behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule Membrane.TCP.CommonSocketBehaviour do
@moduledoc false

alias Membrane.Element
alias Membrane.Element.Base
alias Membrane.Element.CallbackContext
alias Membrane.TCP.Socket

@spec handle_setup(context :: CallbackContext.t(), state :: Element.state()) ::
Base.callback_return()
def handle_setup(
ctx,
%{connection_side: :client, local_socket: local_socket, server_socket: server_socket} =
state
) do
local_socket_connection_result =
if local_socket.state == :connected do
{:ok, local_socket}
else
Socket.connect(local_socket, server_socket)
end

handle_local_socket_connection_result(local_socket_connection_result, ctx, state)
end

def handle_setup(ctx, %{connection_side: :server, local_socket: local_socket} = state) do
local_socket_connection_result =
if local_socket.state == :connected do
{:ok, local_socket}
else
with {:ok, listening_socket} <- Socket.listen(local_socket),
do: Socket.accept(listening_socket)
end

handle_local_socket_connection_result(local_socket_connection_result, ctx, state)
end

@spec handle_local_socket_connection_result(
{:ok, Socket.t()} | {:error, term()},
Membrane.Element.CallbackContext.t(),
Membrane.Element.state()
) :: Membrane.Element.Base.callback_return() | no_return()
defp handle_local_socket_connection_result({:ok, connected_socket}, ctx, state) do
notification = {:connection_info, connected_socket.ip_address, connected_socket.port_no}

Membrane.ResourceGuard.register(
ctx.resource_guard,
fn -> close_socket(connected_socket) end,
tag: :tcp_guard
)

{[notify_parent: notification], %{state | local_socket: connected_socket}}
end

defp handle_local_socket_connection_result({:error, reason}, _ctx, _state) do
raise "Error connecting TCP socket, reason: #{inspect(reason)}"
end

defp close_socket(%Socket{} = local_socket) do
Socket.close(local_socket)
end
end
90 changes: 90 additions & 0 deletions lib/membrane_tcp/sink.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
defmodule Membrane.TCP.Sink do
@moduledoc """
Element that sends buffers received on the input pad over a TCP socket.
"""
use Membrane.Sink

alias Membrane.Buffer
alias Membrane.TCP.{CommonSocketBehaviour, Socket}

def_options connection_side: [
spec: :client | :server,
default: :server,
description: """
Determines whether this element will behave like a server or a client when
establishing TCP connection.
"""
],
server_address: [
spec: :inet.ip_address() | nil,
default: nil,
description: """
An IP Address of the server the packets will be sent to.
(nil in case of `connection_side: :server`)
"""
],
server_port_no: [
spec: :inet.port_number() | nil,
default: nil,
description: """
A TCP port number of the server the packets will be sent to.
(nil in case of `connection_side: :server`)
"""
],
local_address: [
spec: :inet.socket_address(),
default: :any,
description: """
An IP Address from which the socket will connect or will listen on.
It allows to choose which network interface to use if there's more than one.
"""
],
local_port_no: [
spec: :inet.port_number(),
default: 0,
description: """
A TCP port number used when connecting to a listening socket or
starting a listening socket. If not specified any free port is chosen.
"""
],
local_socket: [
spec: Socket.t(),
default: nil,
description: """
Already connected TCP socket with connection side mathing the one passed
as an option, has to be connected.
"""
]

def_input_pad :input, accepted_format: _any

@impl true
def handle_init(_context, opts) do
{local_socket, server_socket} = Socket.create_socket_pair(Map.from_struct(opts))

{[],
%{
connection_side: opts.connection_side,
local_socket: local_socket,
server_socket: server_socket
}}
end

@impl true
def handle_playing(_context, state) do
{[], state}
end

@impl true
def handle_buffer(:input, %Buffer{payload: payload}, _context, state) do
%{local_socket: local_socket} = state

case Socket.send(local_socket, payload) do
:ok -> {[], state}
{:error, cause} -> raise "Error sending TCP packet, reason: #{inspect(cause)}"
end
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour
end
146 changes: 146 additions & 0 deletions lib/membrane_tcp/socket.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
defmodule Membrane.TCP.Socket do
@moduledoc false

@enforce_keys [:port_no, :ip_address]
defstruct [:port_no, :ip_address, :socket_handle, :state, :connection_side, sock_opts: []]

@type t :: %__MODULE__{
port_no: :inet.port_number(),
ip_address: :inet.socket_address(),
socket_handle: :gen_tcp.socket() | nil,
state: :listening | :connected | nil,
connection_side: :server | :client | nil,
sock_opts: [:gen_tcp.option()]
}

@type socket_pair_config :: %{
connection_side: :server | :client | {:client, :inet.address(), :inet.port_number()},
local_address: :inet.socket_address(),
local_port_no: :inet.port_number(),
local_socket: t() | nil
}

@spec create_socket_pair(socket_pair_config(), keyword()) ::
{local_socket :: t(), server_socket :: t() | nil}
def create_socket_pair(
%{connection_side: connection_side, local_socket: local_socket} = sockets_config,
local_socket_options \\ []
) do
local_socket =
case local_socket do
nil ->
%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: connection_side
}

%__MODULE__{connection_side: ^connection_side, state: :connected} ->
local_socket

_not_matching_connection_side_socket ->
raise "Connection side of provided socket not matching options"
end

server_socket =
case connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%__MODULE__{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%__MODULE__{ip_address: address, port_no: port_no, connection_side: :server}
end

{local_socket, server_socket}
end

@spec listen(socket :: t()) :: {:ok, listen_socket :: t()} | {:error, :inet.posix()}
def listen(%__MODULE__{port_no: port_no, ip_address: ip, sock_opts: sock_opts} = socket) do
listen_result =
:gen_tcp.listen(port_no, [:binary, ip: ip, active: false, reuseaddr: true] ++ sock_opts)

with {:ok, listen_socket_handle} <- listen_result,
# Port may change if 0 is used, ip - when either `:any` or `:loopback` is passed
{:ok, {real_ip_addr, real_port_no}} <- :inet.sockname(listen_socket_handle) do
updated_socket = %__MODULE__{
socket
| socket_handle: listen_socket_handle,
port_no: real_port_no,
ip_address: real_ip_addr,
state: :listening
}

{:ok, updated_socket}
end
end

@spec accept(listening_socket :: t()) ::
{:ok, connected_socket :: t()} | {:error, :inet.posix()}
def accept(%__MODULE__{socket_handle: socket_handle, state: :listening} = socket) do
accept_result = :gen_tcp.accept(socket_handle)

with {:ok, connected_socket_handle} <- accept_result do
:gen_tcp.close(socket_handle)

updated_socket = %__MODULE__{
socket
| socket_handle: connected_socket_handle,
state: :connected
}

{:ok, updated_socket}
end
end

@spec connect(local :: t(), target :: t()) :: {:ok, t()} | {:error, :inet.posix()}
def connect(
%__MODULE__{port_no: local_port_no, ip_address: local_ip, sock_opts: sock_opts} =
local_socket,
%__MODULE__{port_no: target_port_no, ip_address: target_ip}
) do
connect_result =
:gen_tcp.connect(
target_ip,
target_port_no,
[:binary, ip: local_ip, port: local_port_no, active: false, reuseaddr: true] ++ sock_opts
)

with {:ok, socket_handle} <- connect_result,
# Port may change if 0 is used, ip - when either `:any` or `:loopback` is passed
{:ok, {real_ip_addr, real_port_no}} <- :inet.sockname(socket_handle) do
updated_socket = %__MODULE__{
local_socket
| socket_handle: socket_handle,
port_no: real_port_no,
ip_address: real_ip_addr,
state: :connected
}

{:ok, updated_socket}
end
end

@spec close(socket :: t()) :: t()
def close(%__MODULE__{socket_handle: handle} = socket) when is_port(handle) do
:ok = :gen_tcp.close(handle)
%__MODULE__{socket | socket_handle: nil, state: nil}
end

@spec send(local_socket :: t(), payload :: Membrane.Payload.t()) ::
:ok | {:error, :closed | :inet.posix()}
def send(%__MODULE__{socket_handle: socket_handle}, payload) when is_port(socket_handle) do
:gen_tcp.send(socket_handle, payload)
end

@spec recv(socket :: t()) ::
{:ok, Membrane.Payload.t()} | {:error, :closed | :timeout | :inet.posix()}
def recv(%__MODULE__{socket_handle: socket_handle}) do
:gen_tcp.recv(socket_handle, 0, 0)
end
end
Loading

0 comments on commit 87966aa

Please sign in to comment.