Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use active socket #6

Merged
merged 8 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This package provides TCP Source and Sink, that read and write to TCP sockets.
Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`.

```elixir
{:membrane_tcp_plugin, "~> 0.3.0"}
{:membrane_tcp_plugin, "~> 0.4.0"}
```

## Copyright and License
Expand Down
95 changes: 94 additions & 1 deletion lib/membrane_tcp/common_socket_behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
alias Membrane.Element.CallbackContext
alias Membrane.TCP.Socket

@type socket_pair_config :: %{
connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()},
local_address: :inet.socket_address(),
local_port_no: :inet.port_number(),
local_socket: :gen_tcp.socket() | nil
}

@spec create_socket_pair(socket_pair_config(), keyword()) ::
{local_socket :: Socket.t(), remote_socket :: Socket.t() | nil}
def create_socket_pair(sockets_config, local_socket_options \\ []) do
local_socket = create_local_socket(sockets_config, local_socket_options)

remote_socket = create_remote_socket(sockets_config, local_socket)

{local_socket, remote_socket}
end

@spec handle_setup(context :: CallbackContext.t(), state :: Element.state()) ::
Base.callback_return()
def handle_setup(
Expand Down Expand Up @@ -35,6 +52,61 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
handle_local_socket_connection_result(local_socket_connection_result, ctx, state)
end

@spec create_local_socket(socket_pair_config(), [:gen_tcp.option()]) :: Socket.t()
defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do
%Socket{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: sockets_config.connection_side
}
end

defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do
{:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle)

cond do
sockets_config.local_address not in [socket_address, :any] ->
raise "Local address passed in options not matching the one of the passed socket."

sockets_config.local_port_no not in [socket_port, 0] ->
raise "Local port passed in options not matching the one of the passed socket."

not match?({:ok, _peername}, :inet.peername(socket_handle)) ->
raise "Local socket not connected."

true ->
:ok
end

:inet.setopts(socket_handle, active: false)

%Socket{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
socket_handle: socket_handle,
state: :connected,
connection_side: sockets_config.connection_side,
sock_opts: local_socket_options
}
end

@spec create_remote_socket(socket_pair_config(), Socket.t()) :: Socket.t()
defp create_remote_socket(sockets_config, local_socket) do
case sockets_config.connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%Socket{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%Socket{ip_address: address, port_no: port_no, connection_side: :server}
end
end

@spec handle_local_socket_connection_result(
{:ok, Socket.t()} | {:error, term()},
Membrane.Element.CallbackContext.t(),
Expand All @@ -49,7 +121,28 @@ defmodule Membrane.TCP.CommonSocketBehaviour do
tag: :tcp_guard
)

{[notify_parent: notification], %{state | local_socket: connected_socket}}
remote_socket =
if state.remote_socket != nil do
state.remote_socket
else
{:ok, {remote_address, remote_port}} = :inet.peername(connected_socket.socket_handle)

remote_connection_side =
case connected_socket.connection_side do
:client -> :server
:server -> :client
end

%Socket{
connection_side: remote_connection_side,
ip_address: remote_address,
port_no: remote_port,
state: :connected
}
end

{[notify_parent: notification],
%{state | local_socket: connected_socket, remote_socket: remote_socket}}
end

defp handle_local_socket_connection_result({:error, reason}, _ctx, _state) do
Expand Down
9 changes: 5 additions & 4 deletions lib/membrane_tcp/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ defmodule Membrane.TCP.Sink do

@impl true
def handle_init(_context, opts) do
{local_socket, remote_socket} = Socket.create_socket_pair(Map.from_struct(opts))
{local_socket, remote_socket} =
CommonSocketBehaviour.create_socket_pair(Map.from_struct(opts))

connection_side =
case opts.connection_side do
Expand All @@ -65,6 +66,9 @@ defmodule Membrane.TCP.Sink do
}}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour

@impl true
def handle_playing(_context, state) do
{[], state}
Expand All @@ -80,9 +84,6 @@ defmodule Membrane.TCP.Sink do
end
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour

@impl true
def handle_end_of_stream(_pad, _context, state) do
local_socket = Socket.close(state.local_socket)
Expand Down
83 changes: 11 additions & 72 deletions lib/membrane_tcp/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,24 @@ defmodule Membrane.TCP.Socket do
@moduledoc false

@enforce_keys [:connection_side, :port_no, :ip_address]
defstruct [:port_no, :ip_address, :socket_handle, :state, :connection_side, sock_opts: []]
defstruct [
:port_no,
:ip_address,
:socket_handle,
:connection_side,
state: :uninitialized,
sock_opts: []
]

@type t :: %__MODULE__{
ip_address: :inet.socket_address(),
port_no: :inet.port_number(),
socket_handle: :gen_tcp.socket() | nil,
state: :listening | :connected | nil,
connection_side: :server | :client | nil,
state: :listening | :connected | :uninitialized,
connection_side: :server | :client,
sock_opts: [:gen_tcp.option()]
}

@type socket_pair_config :: %{
connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()},
local_address: :inet.socket_address(),
local_port_no: :inet.port_number(),
local_socket: :gen_tcp.socket() | nil
}

@spec create_socket_pair(socket_pair_config(), keyword()) ::
{local_socket :: t(), remote_socket :: t() | nil}
def create_socket_pair(sockets_config, local_socket_options \\ []) do
local_socket = create_local_socket(sockets_config, local_socket_options)

remote_socket = create_remote_socket(sockets_config, local_socket)

{local_socket, remote_socket}
end

@spec listen(socket :: t()) :: {:ok, listen_socket :: t()} | {:error, :inet.posix()}
def listen(%__MODULE__{port_no: port_no, ip_address: ip, sock_opts: sock_opts} = local_socket) do
listen_result =
Expand Down Expand Up @@ -99,7 +89,7 @@ defmodule Membrane.TCP.Socket do
@spec close(socket :: t()) :: t()
def close(%__MODULE__{socket_handle: handle} = socket) when is_port(handle) do
:ok = :gen_tcp.close(handle)
%__MODULE__{socket | socket_handle: nil, state: nil}
%__MODULE__{socket | socket_handle: nil, state: :uninitialized}
end

@spec send(local_socket :: t(), payload :: Membrane.Payload.t()) ::
Expand All @@ -113,55 +103,4 @@ defmodule Membrane.TCP.Socket do
def recv(%__MODULE__{socket_handle: socket_handle}, timeout \\ 0) do
:gen_tcp.recv(socket_handle, 0, timeout)
end

defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do
%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
sock_opts: local_socket_options,
connection_side: sockets_config.connection_side
}
end

defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do
{:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle)

cond do
sockets_config.local_address not in [socket_address, :any] ->
raise "Local address passed in options not matching the one of the passed socket."

sockets_config.local_port_no not in [socket_port, 0] ->
raise "Local port passed in options not matching the one of the passed socket."

not match?({:ok, _peername}, :inet.peername(socket_handle)) ->
raise "Local socket not connected."

true ->
:ok
end

%__MODULE__{
ip_address: sockets_config.local_address,
port_no: sockets_config.local_port_no,
socket_handle: socket_handle,
state: :connected,
connection_side: sockets_config.connection_side,
sock_opts: local_socket_options
}
end

defp create_remote_socket(sockets_config, local_socket) do
case sockets_config.connection_side do
:server ->
nil

:client ->
{:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle)

%__MODULE__{ip_address: server_address, port_no: server_port, connection_side: :server}

{:client, address, port_no} ->
%__MODULE__{ip_address: address, port_no: port_no, connection_side: :server}
end
end
end
86 changes: 56 additions & 30 deletions lib/membrane_tcp/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ defmodule Membrane.TCP.Source do
default: nil,
description: """
Already connected TCP socket, if provided will be used instead of creating
and connecting a new one.
and connecting a new one. It's REQUIRED to pass control of it to this element
from the previous owner. It can be done by receiving a
`{:request_socket_control, socket, pid}` message sent by this element to it's
parent and calling `:gen_tcp.controlling_process(socket, pid)` (needs to be called by
a process currently controlling the socket)
mat-hek marked this conversation as resolved.
Show resolved Hide resolved
"""
],
recv_buffer_size: [
Expand All @@ -51,12 +55,17 @@ defmodule Membrane.TCP.Source do
"""
]

def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual
def_output_pad :output,
accepted_format: %RemoteStream{type: :bytestream},
flow_control: :manual,
demand_unit: :buffers

@impl true
def handle_init(_context, opts) do
{local_socket, remote_socket} =
Socket.create_socket_pair(Map.from_struct(opts), recbuf: opts.recv_buffer_size)
CommonSocketBehaviour.create_socket_pair(Map.from_struct(opts),
recbuf: opts.recv_buffer_size
)

connection_side =
case opts.connection_side do
Expand All @@ -65,49 +74,66 @@ defmodule Membrane.TCP.Source do
{:client, _server_address, _server_port_no} -> :client
end

{[],
actions =
case local_socket do
%Socket{socket_handle: nil} ->
[]

%Socket{socket_handle: handle} ->
[notify_parent: {:request_socket_control, handle, self()}]
end

{actions,
%{
connection_side: connection_side,
local_socket: local_socket,
remote_socket: remote_socket
}}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
end

@impl true
def handle_demand(_pad, _size, _unit, _ctx, state) do
case Socket.recv(state.local_socket) do
{:ok, payload} ->
{:ok, {peer_address, peer_port_no}} = :inet.peername(state.local_socket.socket_handle)

metadata =
Map.new()
|> Map.put(:tcp_source_address, peer_address)
|> Map.put(:tcp_source_port, peer_port_no)
|> Map.put(:arrival_ts, Membrane.Time.vm_time())

{
[buffer: {:output, %Buffer{payload: payload, metadata: metadata}}, redemand: :output],
state
}

{:error, :timeout} ->
{[redemand: :output], state}

{:error, :closed} ->
{[end_of_stream: :output], state}

{:error, reason} ->
raise "TCP Socket receiving error, reason: #{inspect(reason)}"
end
def handle_demand(_pad, size, :buffers, _ctx, state) do
:inet.setopts(state.local_socket.socket_handle, active: size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense if only if the demand unit is buffers, you can enforce it in def_output_pad

{[], state}
end

@impl true
defdelegate handle_setup(context, state), to: CommonSocketBehaviour
def handle_info({:tcp, _socket, payload}, _ctx, state) do
metadata =
%{
tcp_source_address: state.remote_socket.ip_address,
tcp_source_port: state.remote_socket.port_no,
arrival_ts: Membrane.Time.vm_time()
}

{
[buffer: {:output, %Buffer{payload: payload, metadata: metadata}}],
state
}
end

@impl true
def handle_info({:tcp_closed, _socket}, _ctx, state) do
{[end_of_stream: :output], state}
end

@impl true
def handle_info({:tcp_passive, _socket}, _ctx, state) do
{[], state}
end

@impl true
def handle_info({:tcp_error, _socket, reason}, _ctx, _state) do
raise "TCP Socket receiving error, reason: #{inspect(reason)}"
end

@impl true
def handle_terminate_request(_ctx, state) do
Expand Down
Loading