Skip to content

Commit

Permalink
Use active socket (#6)
Browse files Browse the repository at this point in the history
* Adapt source to use active mode

* Improve demands

* Add handling :tcp_passive message

* Add type for request_socket_control
  • Loading branch information
Noarkhh authored May 14, 2024
1 parent 5d7d0f1 commit f8fc1df
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 118 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This package provides TCP Source and Sink, that read and write to TCP sockets.
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.

```elixir
{:membrane_tcp_plugin, "~> 0.3.0"}
{:membrane_tcp_plugin, "~> 0.4.0"}
```

## Copyright and License
Expand Down
95 changes: 94 additions & 1 deletion lib/membrane_tcp/common_socket_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
alias Membrane.Element.CallbackContext
alias Membrane.TCP.Socket

@type socket_pair_config :: %{
connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()},
local_address: :inet.socket_address(),
local_port_no: :inet.port_number(),
local_socket: :gen_tcp.socket() | nil
}

@spec create_socket_pair(socket_pair_config(), keyword()) ::
{local_socket :: Socket.t(), remote_socket :: Socket.t() | nil}
def create_socket_pair(sockets_config, local_socket_options \\ []) do
local_socket = create_local_socket(sockets_config, local_socket_options)

remote_socket = create_remote_socket(sockets_config, local_socket)

{local_socket, remote_socket}
end

@spec handle_setup(context :: CallbackContext.t(), state :: Element.state()) ::
Base.callback_return()
def handle_setup(
Expand Down Expand Up @@ -35,6 +52,61 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
handle_local_socket_connection_result(local_socket_connection_result, ctx, state)
end

@spec create_local_socket(socket_pair_config(), [:gen_tcp.option()]) :: Socket.t()
defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do
%Socket{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: sockets_config.connection_side
}
end

defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do
{:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle)

cond do
sockets_config.local_address not in [socket_address, :any] ->
raise "Local address passed in options not matching the one of the passed socket."

sockets_config.local_port_no not in [socket_port, 0] ->
raise "Local port passed in options not matching the one of the passed socket."

not match?({:ok, _peername}, :inet.peername(socket_handle)) ->
raise "Local socket not connected."

true ->
:ok
end

:inet.setopts(socket_handle, active: false)

%Socket{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
socket_handle: socket_handle,
state: :connected,
connection_side: sockets_config.connection_side,
sock_opts: local_socket_options
}
end

@spec create_remote_socket(socket_pair_config(), Socket.t()) :: Socket.t()
defp create_remote_socket(sockets_config, local_socket) do
case sockets_config.connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%Socket{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%Socket{ip_address: address, port_no: port_no, connection_side: :server}
end
end

@spec handle_local_socket_connection_result(
{:ok, Socket.t()} | {:error, term()},
Membrane.Element.CallbackContext.t(),
Expand All @@ -49,7 +121,28 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
tag: :tcp_guard
)

{[notify_parent: notification], %{state | local_socket: connected_socket}}
remote_socket =
if state.remote_socket != nil do
state.remote_socket
else
{:ok, {remote_address, remote_port}} = :inet.peername(connected_socket.socket_handle)

remote_connection_side =
case connected_socket.connection_side do
:client -> :server
:server -> :client
end

%Socket{
connection_side: remote_connection_side,
ip_address: remote_address,
port_no: remote_port,
state: :connected
}
end

{[notify_parent: notification],
%{state | local_socket: connected_socket, remote_socket: remote_socket}}
end

defp handle_local_socket_connection_result({:error, reason}, _ctx, _state) do
Expand Down
9 changes: 5 additions & 4 deletions lib/membrane_tcp/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ defmodule Membrane.TCP.Sink do

@impl true
def handle_init(_context, opts) do
{local_socket, remote_socket} = Socket.create_socket_pair(Map.from_struct(opts))
{local_socket, remote_socket} =
CommonSocketBehaviour.create_socket_pair(Map.from_struct(opts))

connection_side =
case opts.connection_side do
Expand All @@ -73,6 +74,9 @@ defmodule Membrane.TCP.Sink do
}}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour

@impl true
def handle_playing(_context, state) do
{[], state}
Expand All @@ -88,9 +92,6 @@ defmodule Membrane.TCP.Sink do
end
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour

@impl true
def handle_end_of_stream(_pad, _context, state) do
if state.close_on_eos do
Expand Down
83 changes: 11 additions & 72 deletions lib/membrane_tcp/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,24 @@ defmodule Membrane.TCP.Socket do
@moduledoc false

@enforce_keys [:connection_side, :port_no, :ip_address]
defstruct [:port_no, :ip_address, :socket_handle, :state, :connection_side, sock_opts: []]
defstruct [
:port_no,
:ip_address,
:socket_handle,
:connection_side,
state: :uninitialized,
sock_opts: []
]

@type t :: %__MODULE__{
ip_address: :inet.socket_address(),
port_no: :inet.port_number(),
socket_handle: :gen_tcp.socket() | nil,
state: :listening | :connected | nil,
connection_side: :server | :client | nil,
state: :listening | :connected | :uninitialized,
connection_side: :server | :client,
sock_opts: [:gen_tcp.option()]
}

@type socket_pair_config :: %{
connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()},
local_address: :inet.socket_address(),
local_port_no: :inet.port_number(),
local_socket: :gen_tcp.socket() | nil
}

@spec create_socket_pair(socket_pair_config(), keyword()) ::
{local_socket :: t(), remote_socket :: t() | nil}
def create_socket_pair(sockets_config, local_socket_options \\ []) do
local_socket = create_local_socket(sockets_config, local_socket_options)

remote_socket = create_remote_socket(sockets_config, local_socket)

{local_socket, remote_socket}
end

@spec listen(socket :: t()) :: {:ok, listen_socket :: t()} | {:error, :inet.posix()}
def listen(%__MODULE__{port_no: port_no, ip_address: ip, sock_opts: sock_opts} = local_socket) do
listen_result =
Expand Down Expand Up @@ -99,7 +89,7 @@ defmodule Membrane.TCP.Socket do
@spec close(socket :: t()) :: t()
def close(%__MODULE__{socket_handle: handle} = socket) when is_port(handle) do
:ok = :gen_tcp.close(handle)
%__MODULE__{socket | socket_handle: nil, state: nil}
%__MODULE__{socket | socket_handle: nil, state: :uninitialized}
end

@spec send(local_socket :: t(), payload :: Membrane.Payload.t()) ::
Expand All @@ -113,55 +103,4 @@ defmodule Membrane.TCP.Socket do
def recv(%__MODULE__{socket_handle: socket_handle}, timeout \\ 0) do
:gen_tcp.recv(socket_handle, 0, timeout)
end

defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do
%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: sockets_config.connection_side
}
end

defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do
{:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle)

cond do
sockets_config.local_address not in [socket_address, :any] ->
raise "Local address passed in options not matching the one of the passed socket."

sockets_config.local_port_no not in [socket_port, 0] ->
raise "Local port passed in options not matching the one of the passed socket."

not match?({:ok, _peername}, :inet.peername(socket_handle)) ->
raise "Local socket not connected."

true ->
:ok
end

%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
socket_handle: socket_handle,
state: :connected,
connection_side: sockets_config.connection_side,
sock_opts: local_socket_options
}
end

defp create_remote_socket(sockets_config, local_socket) do
case sockets_config.connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%__MODULE__{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%__MODULE__{ip_address: address, port_no: port_no, connection_side: :server}
end
end
end
91 changes: 61 additions & 30 deletions lib/membrane_tcp/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ defmodule Membrane.TCP.Source do
default: nil,
description: """
Already connected TCP socket, if provided will be used instead of creating
and connecting a new one.
and connecting a new one. It's REQUIRED to pass control of it to this element
from the previous owner. It can be done by receiving a
`{:request_socket_control, socket, pid}` message sent by this element to it's
parent and calling `:gen_tcp.controlling_process(socket, pid)` (needs to be called by
a process currently controlling the socket)
"""
],
recv_buffer_size: [
Expand All @@ -51,12 +55,22 @@ defmodule Membrane.TCP.Source do
"""
]

def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual
def_output_pad :output,
accepted_format: %RemoteStream{type: :bytestream},
flow_control: :manual,
demand_unit: :buffers

@typedoc """
Notification sent when a local socket handle was provided via `local_socket` option.
"""
@type request_socket_control() :: {:request_socket_control, :gen_tcp.socket(), pid()}

@impl true
def handle_init(_context, opts) do
{local_socket, remote_socket} =
Socket.create_socket_pair(Map.from_struct(opts), recbuf: opts.recv_buffer_size)
CommonSocketBehaviour.create_socket_pair(Map.from_struct(opts),
recbuf: opts.recv_buffer_size
)

connection_side =
case opts.connection_side do
Expand All @@ -65,49 +79,66 @@ defmodule Membrane.TCP.Source do
{:client, _server_address, _server_port_no} -> :client
end

{[],
actions =
case local_socket do
%Socket{socket_handle: nil} ->
[]

%Socket{socket_handle: handle} ->
[notify_parent: {:request_socket_control, handle, self()}]
end

{actions,
%{
connection_side: connection_side,
local_socket: local_socket,
remote_socket: remote_socket
}}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
end

@impl true
def handle_demand(_pad, _size, _unit, _ctx, state) do
case Socket.recv(state.local_socket) do
{:ok, payload} ->
{:ok, {peer_address, peer_port_no}} = :inet.peername(state.local_socket.socket_handle)

metadata =
Map.new()
|> Map.put(:tcp_source_address, peer_address)
|> Map.put(:tcp_source_port, peer_port_no)
|> Map.put(:arrival_ts, Membrane.Time.vm_time())

{
[buffer: {:output, %Buffer{payload: payload, metadata: metadata}}, redemand: :output],
state
}

{:error, :timeout} ->
{[redemand: :output], state}

{:error, :closed} ->
{[end_of_stream: :output], state}

{:error, reason} ->
raise "TCP Socket receiving error, reason: #{inspect(reason)}"
end
def handle_demand(_pad, size, :buffers, _ctx, state) do
:inet.setopts(state.local_socket.socket_handle, active: size)
{[], state}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour
def handle_info({:tcp, _socket, payload}, _ctx, state) do
metadata =
%{
tcp_source_address: state.remote_socket.ip_address,
tcp_source_port: state.remote_socket.port_no,
arrival_ts: Membrane.Time.vm_time()
}

{
[buffer: {:output, %Buffer{payload: payload, metadata: metadata}}],
state
}
end

@impl true
def handle_info({:tcp_closed, _socket}, _ctx, state) do
{[end_of_stream: :output], state}
end

@impl true
def handle_info({:tcp_passive, _socket}, _ctx, state) do
{[], state}
end

@impl true
def handle_info({:tcp_error, _socket, reason}, _ctx, _state) do
raise "TCP Socket receiving error, reason: #{inspect(reason)}"
end

@impl true
def handle_terminate_request(_ctx, state) do
Expand Down
Loading

0 comments on commit f8fc1df

Please sign in to comment.