Skip to content

Commit

Permalink
Fix sink, add sink tests, make TCP.Socket private (#3)
Browse files Browse the repository at this point in the history
* Fix sink, add sink tests

* Fix integration test

* Add new integration test

* Add more tests

* Make TCP.Socket private, local_socket option now takes :gen_tcp.socket()
  • Loading branch information
Noarkhh authored Feb 29, 2024
1 parent 6e72506 commit 474d9fd
Show file tree
Hide file tree
Showing 10 changed files with 397 additions and 149 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This package provides TCP Source and Sink, that read and write to TCP sockets.
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.

```elixir
{:membrane_tcp_plugin, "~> 0.1.0"}
{:membrane_tcp_plugin, "~> 0.2.0"}
```

## Copyright and License
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane_tcp/common_socket_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
Base.callback_return()
def handle_setup(
ctx,
%{connection_side: :client, local_socket: local_socket, server_socket: server_socket} =
%{connection_side: :client, local_socket: local_socket, remote_socket: server_socket} =
state
) do
local_socket_connection_result =
Expand Down
47 changes: 21 additions & 26 deletions lib/membrane_tcp/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,15 @@ defmodule Membrane.TCP.Sink do
alias Membrane.TCP.{CommonSocketBehaviour, Socket}

def_options connection_side: [
spec: :client | :server,
default: :server,
spec:
:server
| :client
| {:client, server_address :: :inet.ip_address(),
server_port_no :: :inet.port_number()},
description: """
Determines whether this element will behave like a server or a client when
establishing TCP connection.
"""
],
server_address: [
spec: :inet.ip_address() | nil,
default: nil,
description: """
An IP Address of the server the packets will be sent to.
(nil in case of `connection_side: :server`)
"""
],
server_port_no: [
spec: :inet.port_number() | nil,
default: nil,
description: """
A TCP port number of the server the packets will be sent to.
(nil in case of `connection_side: :server`)
Determines whether this element will operate like a server or a client when
establishing TCP connection. In case of client-side connection server address
and port number are required, unless `local_socket` is provided.
"""
],
local_address: [
Expand All @@ -48,25 +36,32 @@ defmodule Membrane.TCP.Sink do
"""
],
local_socket: [
spec: Socket.t(),
spec: :gen_tcp.socket() | nil,
default: nil,
description: """
Already connected TCP socket with connection side mathing the one passed
as an option, has to be connected.
Already connected TCP socket, if provided will be used instead of creating
and connecting a new one.
"""
]

def_input_pad :input, accepted_format: _any

@impl true
def handle_init(_context, opts) do
{local_socket, server_socket} = Socket.create_socket_pair(Map.from_struct(opts))
{local_socket, remote_socket} = Socket.create_socket_pair(Map.from_struct(opts))

connection_side =
case opts.connection_side do
:server -> :server
:client -> :client
{:client, _server_address, _server_port_no} -> :client
end

{[],
%{
connection_side: opts.connection_side,
connection_side: connection_side,
local_socket: local_socket,
server_socket: server_socket
remote_socket: remote_socket
}}
end

Expand Down
113 changes: 66 additions & 47 deletions lib/membrane_tcp/socket.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
defmodule Membrane.TCP.Socket do
@moduledoc """
TCP Socket behavior
"""
@moduledoc false

@enforce_keys [:connection_side, :port_no, :ip_address]
defstruct [:port_no, :ip_address, :socket_handle, :state, :connection_side, sock_opts: []]

@type t :: %__MODULE__{
port_no: :inet.port_number(),
ip_address: :inet.socket_address(),
port_no: :inet.port_number(),
socket_handle: :gen_tcp.socket() | nil,
state: :listening | :connected | nil,
connection_side: :server | :client | nil,
Expand All @@ -19,59 +17,29 @@ defmodule Membrane.TCP.Socket do
connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()},
local_address: :inet.socket_address(),
local_port_no: :inet.port_number(),
local_socket: t() | nil
local_socket: :gen_tcp.socket() | nil
}

@spec create_socket_pair(socket_pair_config(), keyword()) ::
{local_socket :: t(), server_socket :: t() | nil}
def create_socket_pair(
%{connection_side: connection_side, local_socket: local_socket} = sockets_config,
local_socket_options \\ []
) do
local_socket =
case local_socket do
nil ->
%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: connection_side
}

%__MODULE__{connection_side: ^connection_side, state: :connected} ->
local_socket

_not_matching_connection_side_socket ->
raise "Connection side of provided socket not matching options"
end

server_socket =
case connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%__MODULE__{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%__MODULE__{ip_address: address, port_no: port_no, connection_side: :server}
end

{local_socket, server_socket}
{local_socket :: t(), remote_socket :: t() | nil}
def create_socket_pair(sockets_config, local_socket_options \\ []) do
local_socket = create_local_socket(sockets_config, local_socket_options)

remote_socket = create_remote_socket(sockets_config, local_socket)

{local_socket, remote_socket}
end

@spec listen(socket :: t()) :: {:ok, listen_socket :: t()} | {:error, :inet.posix()}
def listen(%__MODULE__{port_no: port_no, ip_address: ip, sock_opts: sock_opts} = socket) do
def listen(%__MODULE__{port_no: port_no, ip_address: ip, sock_opts: sock_opts} = local_socket) do
listen_result =
:gen_tcp.listen(port_no, [:binary, ip: ip, active: false, reuseaddr: true] ++ sock_opts)

with {:ok, listen_socket_handle} <- listen_result,
# Port may change if 0 is used, ip - when either `:any` or `:loopback` is passed
{:ok, {real_ip_addr, real_port_no}} <- :inet.sockname(listen_socket_handle) do
updated_socket = %__MODULE__{
socket
local_socket
| socket_handle: listen_socket_handle,
port_no: real_port_no,
ip_address: real_ip_addr,
Expand Down Expand Up @@ -140,9 +108,60 @@ defmodule Membrane.TCP.Socket do
:gen_tcp.send(socket_handle, payload)
end

@spec recv(socket :: t()) ::
@spec recv(socket :: t(), timeout :: non_neg_integer()) ::
{:ok, Membrane.Payload.t()} | {:error, :closed | :timeout | :inet.posix()}
def recv(%__MODULE__{socket_handle: socket_handle}) do
:gen_tcp.recv(socket_handle, 0, 0)
def recv(%__MODULE__{socket_handle: socket_handle}, timeout \\ 0) do
:gen_tcp.recv(socket_handle, 0, timeout)
end

defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do
%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: sockets_config.connection_side
}
end

defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do
{:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle)

cond do
sockets_config.local_address not in [socket_address, :any] ->
raise "Local address passed in options not matching the one of the passed socket."

sockets_config.local_port_no not in [socket_port, 0] ->
raise "Local port passed in options not matching the one of the passed socket."

not match?({:ok, _peername}, :inet.peername(socket_handle)) ->
raise "Local socket not connected."

true ->
:ok
end

%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
socket_handle: socket_handle,
state: :connected,
connection_side: sockets_config.connection_side,
sock_opts: local_socket_options
}
end

defp create_remote_socket(sockets_config, local_socket) do
case sockets_config.connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%__MODULE__{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%__MODULE__{ip_address: address, port_no: port_no, connection_side: :server}
end
end
end
10 changes: 5 additions & 5 deletions lib/membrane_tcp/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ defmodule Membrane.TCP.Source do
"""
],
local_socket: [
spec: Socket.t(),
spec: :gen_tcp.socket() | nil,
default: nil,
description: """
Already connected TCP socket with connection side mathing the one passed
as an option, has to be connected.
Already connected TCP socket, if provided will be used instead of creating
and connecting a new one.
"""
],
recv_buffer_size: [
Expand All @@ -55,7 +55,7 @@ defmodule Membrane.TCP.Source do

@impl true
def handle_init(_context, opts) do
{local_socket, server_socket} =
{local_socket, remote_socket} =
Socket.create_socket_pair(Map.from_struct(opts), recbuf: opts.recv_buffer_size)

connection_side =
Expand All @@ -69,7 +69,7 @@ defmodule Membrane.TCP.Source do
%{
connection_side: connection_side,
local_socket: local_socket,
server_socket: server_socket
remote_socket: remote_socket
}}
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.TCP.MixProject do
use Mix.Project

@version "0.1.0"
@version "0.2.0"
@github_url "https://github.com/membraneframework/membrane_tcp_plugin"

def project do
Expand Down
56 changes: 45 additions & 11 deletions test/membrane_tcp/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ defmodule Membrane.TCP.IntegrationTest do
alias Membrane.Testing.Pipeline

@server_port 6789
@local_address {127, 0, 0, 1}
@localhost {127, 0, 0, 1}

@payload_frames 100

test "send and receive using 2 pipelines" do
test "send from server-side pipeline and receive on client-side pipeline" do
data = Enum.map(1..@payload_frames, &"(#{&1})") ++ ["."]

sender =
Pipeline.start_link_supervised!(
spec:
child(:source, %Testing.Source{output: data})
|> child(:sink, %TCP.Sink{
local_address: @local_address,
connection_side: :server,
local_address: @localhost,
local_port_no: @server_port
})
)
Expand All @@ -30,19 +31,52 @@ defmodule Membrane.TCP.IntegrationTest do
Pipeline.start_link_supervised!(
spec:
child(:source, %TCP.Source{
connection_side: {:client, @local_address, @server_port},
local_address: @local_address
connection_side: {:client, @localhost, @server_port},
local_address: @localhost
})
|> child(:sink, %Testing.Sink{})
)

assert_pipeline_notified(sender, :sink, {:connection_info, @local_address, @server_port})
assert_pipeline_notified(sender, :sink, {:connection_info, @localhost, @server_port})

assert_pipeline_notified(
receiver,
:source,
{:connection_info, @local_address, _ephemeral_port}
)
assert_pipeline_notified(receiver, :source, {:connection_info, @localhost, _port})

assert_end_of_stream(sender, :sink)

received_data = TCP.TestingSinkReceiver.receive_data(receiver)

assert received_data == Enum.join(data)
Pipeline.terminate(sender)
Pipeline.terminate(receiver)
end

test "send from client-side pipeline and receive on server-side pipeline" do
data = Enum.map(1..@payload_frames, &"(#{&1})") ++ ["."]

receiver =
Pipeline.start_link_supervised!(
spec:
child(:source, %TCP.Source{
connection_side: :server,
local_address: @localhost,
local_port_no: @server_port
})
|> child(:sink, %Testing.Sink{})
)

sender =
Pipeline.start_link_supervised!(
spec:
child(:source, %Testing.Source{output: data})
|> child(:sink, %TCP.Sink{
connection_side: {:client, @localhost, @server_port},
local_address: @localhost
})
)

assert_pipeline_notified(sender, :sink, {:connection_info, @localhost, _port})

assert_pipeline_notified(receiver, :source, {:connection_info, @localhost, @server_port})

assert_end_of_stream(sender, :sink)

Expand Down
Loading

0 comments on commit 474d9fd

Please sign in to comment.