From 9e260b4ee5591ac62c6d0ce1301b2b095d95d945 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Fri, 19 Apr 2024 18:45:45 +0200 Subject: [PATCH 1/8] Start adapting source to use active mode --- lib/membrane_tcp/socket.ex | 2 ++ lib/membrane_tcp/source.ex | 70 ++++++++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/lib/membrane_tcp/socket.ex b/lib/membrane_tcp/socket.ex index 5f8240c..23a2fce 100644 --- a/lib/membrane_tcp/socket.ex +++ b/lib/membrane_tcp/socket.ex @@ -140,6 +140,8 @@ defmodule Membrane.TCP.Socket do :ok end + :inet.setopts(socket_handle, active: false) + %__MODULE__{ ip_address: sockets_config.local_address, port_no: sockets_config.local_port_no, diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index 3f90f86..a9c90cb 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -40,7 +40,11 @@ defmodule Membrane.TCP.Source do default: nil, description: """ Already connected TCP socket, if provided will be used instead of creating - and connecting a new one. + and connecting a new one. It's REQUIRED to pass control of it to this element + from the previous owner. It can be done by receiving a + `{:request_socket_control, socket, pid}` message sent by this element to it's + parent and calling `:inet.controlling_process(socket, pid)` (needs to be called by + a process currently controlling the socket) """ ], recv_buffer_size: [ @@ -65,7 +69,16 @@ defmodule Membrane.TCP.Source do {:client, _server_address, _server_port_no} -> :client end - {[], + actions = + case local_socket do + %Socket{socket_handle: nil} -> + [] + + %Socket{socket_handle: handle} -> + [notify_parent: {:request_socket_control, handle, self()}] + end + + {actions, %{ connection_side: connection_side, local_socket: local_socket, @@ -80,30 +93,35 @@ defmodule Membrane.TCP.Source do @impl true def handle_demand(_pad, _size, _unit, _ctx, state) do - case Socket.recv(state.local_socket) do - {:ok, payload} -> - {:ok, {peer_address, peer_port_no}} = :inet.peername(state.local_socket.socket_handle) - - metadata = - Map.new() - |> Map.put(:tcp_source_address, peer_address) - |> Map.put(:tcp_source_port, peer_port_no) - |> Map.put(:arrival_ts, Membrane.Time.vm_time()) - - { - [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}, redemand: :output], - state - } - - {:error, :timeout} -> - {[redemand: :output], state} - - {:error, :closed} -> - {[end_of_stream: :output], state} - - {:error, reason} -> - raise "TCP Socket receiving error, reason: #{inspect(reason)}" - end + :inet.setopts(state.local_socket.socket_handle, active: :once) + {[], state} + end + + @impl true + def handle_info({:tcp, _socket, payload}, _ctx, state) do + IO.inspect(state.remote_socket, label: "dupa") + + metadata = + %{ + tcp_source_address: state.remote_socket.ip_address, + tcp_source_port: state.remote_socket.port_no, + arrival_ts: Membrane.Time.vm_time() + } + + { + [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}, redemand: :output], + state + } + end + + @impl true + def handle_info({:tcp_closed, _socket}, _ctx, state) do + {[end_of_stream: :output], state} + end + + @impl true + def handle_info({:tcp_error, _socket, reason}, _ctx, _state) do + raise "TCP Socket receiving error, reason: #{inspect(reason)}" end @impl true From 663e078f0fbc00e1695deb29e02888fab686f359 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Tue, 23 Apr 2024 11:37:05 +0200 Subject: [PATCH 2/8] Make tests pass --- lib/membrane_tcp/common_socket_behaviour.ex | 23 ++++++++++++++++++++- lib/membrane_tcp/socket.ex | 2 +- lib/membrane_tcp/source.ex | 4 +--- test/membrane_tcp/source/pipeline_test.exs | 6 ++++++ 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/lib/membrane_tcp/common_socket_behaviour.ex b/lib/membrane_tcp/common_socket_behaviour.ex index f527ff3..b389fb4 100644 --- a/lib/membrane_tcp/common_socket_behaviour.ex +++ b/lib/membrane_tcp/common_socket_behaviour.ex @@ -49,7 +49,28 @@ defmodule Membrane.TCP.CommonSocketBehaviour do tag: :tcp_guard ) - {[notify_parent: notification], %{state | local_socket: connected_socket}} + remote_socket = + if state.remote_socket != nil do + state.remote_socket + else + {:ok, {remote_address, remote_port}} = :inet.peername(connected_socket.socket_handle) + + remote_connection_side = + case connected_socket.connection_side do + :client -> :server + :server -> :client + end + + %Socket{ + connection_side: remote_connection_side, + ip_address: remote_address, + port_no: remote_port, + state: :connected + } + end + + {[notify_parent: notification], + %{state | local_socket: connected_socket, remote_socket: remote_socket}} end defp handle_local_socket_connection_result({:error, reason}, _ctx, _state) do diff --git a/lib/membrane_tcp/socket.ex b/lib/membrane_tcp/socket.ex index 23a2fce..4b4d44d 100644 --- a/lib/membrane_tcp/socket.ex +++ b/lib/membrane_tcp/socket.ex @@ -9,7 +9,7 @@ defmodule Membrane.TCP.Socket do port_no: :inet.port_number(), socket_handle: :gen_tcp.socket() | nil, state: :listening | :connected | nil, - connection_side: :server | :client | nil, + connection_side: :server | :client, sock_opts: [:gen_tcp.option()] } diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index a9c90cb..639da91 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -43,7 +43,7 @@ defmodule Membrane.TCP.Source do and connecting a new one. It's REQUIRED to pass control of it to this element from the previous owner. It can be done by receiving a `{:request_socket_control, socket, pid}` message sent by this element to it's - parent and calling `:inet.controlling_process(socket, pid)` (needs to be called by + parent and calling `:gen_tcp.controlling_process(socket, pid)` (needs to be called by a process currently controlling the socket) """ ], @@ -99,8 +99,6 @@ defmodule Membrane.TCP.Source do @impl true def handle_info({:tcp, _socket, payload}, _ctx, state) do - IO.inspect(state.remote_socket, label: "dupa") - metadata = %{ tcp_source_address: state.remote_socket.ip_address, diff --git a/test/membrane_tcp/source/pipeline_test.exs b/test/membrane_tcp/source/pipeline_test.exs index cae407e..5467494 100644 --- a/test/membrane_tcp/source/pipeline_test.exs +++ b/test/membrane_tcp/source/pipeline_test.exs @@ -114,6 +114,9 @@ defmodule Membrane.TCP.SourcePipelineTest do test_process: self() ) + assert_pipeline_notified(pipeline, :tcp_source, {:request_socket_control, socket, pid}) + :gen_tcp.controlling_process(socket, pid) + assert_sink_playing(pipeline, :sink) run_pipeline(pipeline, server_socket) @@ -133,6 +136,9 @@ defmodule Membrane.TCP.SourcePipelineTest do test_process: self() ) + assert_pipeline_notified(pipeline, :tcp_source, {:request_socket_control, socket, pid}) + :gen_tcp.controlling_process(socket, pid) + assert_sink_playing(pipeline, :sink) run_pipeline(pipeline, client_socket) From 6d33ea98ba6d30d5736fba4c689fb2df2949f67b Mon Sep 17 00:00:00 2001 From: noarkhh Date: Tue, 23 Apr 2024 11:57:47 +0200 Subject: [PATCH 3/8] Refactor --- lib/membrane_tcp/common_socket_behaviour.ex | 72 ++++++++++++++++++ lib/membrane_tcp/sink.ex | 9 ++- lib/membrane_tcp/socket.ex | 83 +++------------------ lib/membrane_tcp/source.ex | 10 ++- 4 files changed, 93 insertions(+), 81 deletions(-) diff --git a/lib/membrane_tcp/common_socket_behaviour.ex b/lib/membrane_tcp/common_socket_behaviour.ex index b389fb4..d957641 100644 --- a/lib/membrane_tcp/common_socket_behaviour.ex +++ b/lib/membrane_tcp/common_socket_behaviour.ex @@ -6,6 +6,23 @@ defmodule Membrane.TCP.CommonSocketBehaviour do alias Membrane.Element.CallbackContext alias Membrane.TCP.Socket + @type socket_pair_config :: %{ + connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()}, + local_address: :inet.socket_address(), + local_port_no: :inet.port_number(), + local_socket: :gen_tcp.socket() | nil + } + + @spec create_socket_pair(socket_pair_config(), keyword()) :: + {local_socket :: Socket.t(), remote_socket :: Socket.t() | nil} + def create_socket_pair(sockets_config, local_socket_options \\ []) do + local_socket = create_local_socket(sockets_config, local_socket_options) + + remote_socket = create_remote_socket(sockets_config, local_socket) + + {local_socket, remote_socket} + end + @spec handle_setup(context :: CallbackContext.t(), state :: Element.state()) :: Base.callback_return() def handle_setup( @@ -35,6 +52,61 @@ defmodule Membrane.TCP.CommonSocketBehaviour do handle_local_socket_connection_result(local_socket_connection_result, ctx, state) end + @spec create_local_socket(socket_pair_config(), [:gen_tcp.option()]) :: Socket.t() + defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do + %Socket{ + ip_address: sockets_config.local_address, + port_no: sockets_config.local_port_no, + sock_opts: local_socket_options, + connection_side: sockets_config.connection_side + } + end + + defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do + {:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle) + + cond do + sockets_config.local_address not in [socket_address, :any] -> + raise "Local address passed in options not matching the one of the passed socket." + + sockets_config.local_port_no not in [socket_port, 0] -> + raise "Local port passed in options not matching the one of the passed socket." + + not match?({:ok, _peername}, :inet.peername(socket_handle)) -> + raise "Local socket not connected." + + true -> + :ok + end + + :inet.setopts(socket_handle, active: false) + + %Socket{ + ip_address: sockets_config.local_address, + port_no: sockets_config.local_port_no, + socket_handle: socket_handle, + state: :connected, + connection_side: sockets_config.connection_side, + sock_opts: local_socket_options + } + end + + @spec create_remote_socket(socket_pair_config(), Socket.t()) :: Socket.t() + defp create_remote_socket(sockets_config, local_socket) do + case sockets_config.connection_side do + :server -> + nil + + :client -> + {:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle) + + %Socket{ip_address: server_address, port_no: server_port, connection_side: :server} + + {:client, address, port_no} -> + %Socket{ip_address: address, port_no: port_no, connection_side: :server} + end + end + @spec handle_local_socket_connection_result( {:ok, Socket.t()} | {:error, term()}, Membrane.Element.CallbackContext.t(), diff --git a/lib/membrane_tcp/sink.ex b/lib/membrane_tcp/sink.ex index d198279..3c8061a 100644 --- a/lib/membrane_tcp/sink.ex +++ b/lib/membrane_tcp/sink.ex @@ -48,7 +48,8 @@ defmodule Membrane.TCP.Sink do @impl true def handle_init(_context, opts) do - {local_socket, remote_socket} = Socket.create_socket_pair(Map.from_struct(opts)) + {local_socket, remote_socket} = + CommonSocketBehaviour.create_socket_pair(Map.from_struct(opts)) connection_side = case opts.connection_side do @@ -65,6 +66,9 @@ defmodule Membrane.TCP.Sink do }} end + @impl true + defdelegate handle_setup(context, state), to: CommonSocketBehaviour + @impl true def handle_playing(_context, state) do {[], state} @@ -80,9 +84,6 @@ defmodule Membrane.TCP.Sink do end end - @impl true - defdelegate handle_setup(context, state), to: CommonSocketBehaviour - @impl true def handle_end_of_stream(_pad, _context, state) do local_socket = Socket.close(state.local_socket) diff --git a/lib/membrane_tcp/socket.ex b/lib/membrane_tcp/socket.ex index 4b4d44d..cfba5f3 100644 --- a/lib/membrane_tcp/socket.ex +++ b/lib/membrane_tcp/socket.ex @@ -2,34 +2,24 @@ defmodule Membrane.TCP.Socket do @moduledoc false @enforce_keys [:connection_side, :port_no, :ip_address] - defstruct [:port_no, :ip_address, :socket_handle, :state, :connection_side, sock_opts: []] + defstruct [ + :port_no, + :ip_address, + :socket_handle, + :connection_side, + state: :uninitialized, + sock_opts: [] + ] @type t :: %__MODULE__{ ip_address: :inet.socket_address(), port_no: :inet.port_number(), socket_handle: :gen_tcp.socket() | nil, - state: :listening | :connected | nil, + state: :listening | :connected | :uninitialized, connection_side: :server | :client, sock_opts: [:gen_tcp.option()] } - @type socket_pair_config :: %{ - connection_side: :server | :client | {:client, :inet.ip_address(), :inet.port_number()}, - local_address: :inet.socket_address(), - local_port_no: :inet.port_number(), - local_socket: :gen_tcp.socket() | nil - } - - @spec create_socket_pair(socket_pair_config(), keyword()) :: - {local_socket :: t(), remote_socket :: t() | nil} - def create_socket_pair(sockets_config, local_socket_options \\ []) do - local_socket = create_local_socket(sockets_config, local_socket_options) - - remote_socket = create_remote_socket(sockets_config, local_socket) - - {local_socket, remote_socket} - end - @spec listen(socket :: t()) :: {:ok, listen_socket :: t()} | {:error, :inet.posix()} def listen(%__MODULE__{port_no: port_no, ip_address: ip, sock_opts: sock_opts} = local_socket) do listen_result = @@ -99,7 +89,7 @@ defmodule Membrane.TCP.Socket do @spec close(socket :: t()) :: t() def close(%__MODULE__{socket_handle: handle} = socket) when is_port(handle) do :ok = :gen_tcp.close(handle) - %__MODULE__{socket | socket_handle: nil, state: nil} + %__MODULE__{socket | socket_handle: nil, state: :uninitialized} end @spec send(local_socket :: t(), payload :: Membrane.Payload.t()) :: @@ -113,57 +103,4 @@ defmodule Membrane.TCP.Socket do def recv(%__MODULE__{socket_handle: socket_handle}, timeout \\ 0) do :gen_tcp.recv(socket_handle, 0, timeout) end - - defp create_local_socket(%{local_socket: nil} = sockets_config, local_socket_options) do - %__MODULE__{ - ip_address: sockets_config.local_address, - port_no: sockets_config.local_port_no, - sock_opts: local_socket_options, - connection_side: sockets_config.connection_side - } - end - - defp create_local_socket(%{local_socket: socket_handle} = sockets_config, local_socket_options) do - {:ok, {socket_address, socket_port}} = :inet.sockname(socket_handle) - - cond do - sockets_config.local_address not in [socket_address, :any] -> - raise "Local address passed in options not matching the one of the passed socket." - - sockets_config.local_port_no not in [socket_port, 0] -> - raise "Local port passed in options not matching the one of the passed socket." - - not match?({:ok, _peername}, :inet.peername(socket_handle)) -> - raise "Local socket not connected." - - true -> - :ok - end - - :inet.setopts(socket_handle, active: false) - - %__MODULE__{ - ip_address: sockets_config.local_address, - port_no: sockets_config.local_port_no, - socket_handle: socket_handle, - state: :connected, - connection_side: sockets_config.connection_side, - sock_opts: local_socket_options - } - end - - defp create_remote_socket(sockets_config, local_socket) do - case sockets_config.connection_side do - :server -> - nil - - :client -> - {:ok, {server_address, server_port}} = :inet.peername(local_socket.socket_handle) - - %__MODULE__{ip_address: server_address, port_no: server_port, connection_side: :server} - - {:client, address, port_no} -> - %__MODULE__{ip_address: address, port_no: port_no, connection_side: :server} - end - end end diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index 639da91..11b3506 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -60,7 +60,9 @@ defmodule Membrane.TCP.Source do @impl true def handle_init(_context, opts) do {local_socket, remote_socket} = - Socket.create_socket_pair(Map.from_struct(opts), recbuf: opts.recv_buffer_size) + CommonSocketBehaviour.create_socket_pair(Map.from_struct(opts), + recbuf: opts.recv_buffer_size + ) connection_side = case opts.connection_side do @@ -86,6 +88,9 @@ defmodule Membrane.TCP.Source do }} end + @impl true + defdelegate handle_setup(context, state), to: CommonSocketBehaviour + @impl true def handle_playing(_ctx, state) do {[stream_format: {:output, %RemoteStream{type: :bytestream}}], state} @@ -122,9 +127,6 @@ defmodule Membrane.TCP.Source do raise "TCP Socket receiving error, reason: #{inspect(reason)}" end - @impl true - defdelegate handle_setup(context, state), to: CommonSocketBehaviour - @impl true def handle_terminate_request(_ctx, state) do Socket.close(state.local_socket) From e59c37bfa2c2aad0d0316a389aefed4b27a4dbb6 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 24 Apr 2024 12:15:45 +0200 Subject: [PATCH 4/8] Bump version --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 844bd7c..8e66faf 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This package provides TCP Source and Sink, that read and write to TCP sockets. Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`. ```elixir - {:membrane_tcp_plugin, "~> 0.3.0"} + {:membrane_tcp_plugin, "~> 0.4.0"} ``` ## Copyright and License diff --git a/mix.exs b/mix.exs index 0a1c216..b933e5d 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.TCP.MixProject do use Mix.Project - @version "0.3.0" + @version "0.4.0" @github_url "https://github.com/membraneframework/membrane_tcp_plugin" def project do From 41b4d74c490dc7c55dabdd673e9e4ba63549806a Mon Sep 17 00:00:00 2001 From: noarkhh Date: Thu, 25 Apr 2024 14:27:49 +0200 Subject: [PATCH 5/8] Act according to demand size --- lib/membrane_tcp/source.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index 11b3506..5df40ad 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -97,8 +97,8 @@ defmodule Membrane.TCP.Source do end @impl true - def handle_demand(_pad, _size, _unit, _ctx, state) do - :inet.setopts(state.local_socket.socket_handle, active: :once) + def handle_demand(_pad, size, _unit, _ctx, state) do + :inet.setopts(state.local_socket.socket_handle, active: size) {[], state} end From cd02f1c5b588314b6e00973b9fbb3d50f8ebd2ff Mon Sep 17 00:00:00 2001 From: noarkhh Date: Mon, 6 May 2024 17:39:57 +0200 Subject: [PATCH 6/8] Improve demands --- lib/membrane_tcp/source.ex | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index 5df40ad..89d1c9c 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -55,7 +55,10 @@ defmodule Membrane.TCP.Source do """ ] - def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual + def_output_pad :output, + accepted_format: %RemoteStream{type: :bytestream}, + flow_control: :manual, + demand_unit: :buffers @impl true def handle_init(_context, opts) do @@ -97,7 +100,7 @@ defmodule Membrane.TCP.Source do end @impl true - def handle_demand(_pad, size, _unit, _ctx, state) do + def handle_demand(_pad, size, :buffers, _ctx, state) do :inet.setopts(state.local_socket.socket_handle, active: size) {[], state} end @@ -112,7 +115,7 @@ defmodule Membrane.TCP.Source do } { - [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}, redemand: :output], + [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}], state } end From 398d78833998fcfbd9150c8954d8f4a99e168526 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Wed, 8 May 2024 15:41:45 +0200 Subject: [PATCH 7/8] Add handling :tcp_passive message --- lib/membrane_tcp/source.ex | 5 +++++ test/membrane_tcp/integration_test.exs | 15 ++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index 89d1c9c..7f39ee4 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -125,6 +125,11 @@ defmodule Membrane.TCP.Source do {[end_of_stream: :output], state} end + @impl true + def handle_info({:tcp_passive, _socket}, _ctx, state) do + {[], state} + end + @impl true def handle_info({:tcp_error, _socket, reason}, _ctx, _state) do raise "TCP Socket receiving error, reason: #{inspect(reason)}" diff --git a/test/membrane_tcp/integration_test.exs b/test/membrane_tcp/integration_test.exs index 6ce3cce..cf3d3b8 100644 --- a/test/membrane_tcp/integration_test.exs +++ b/test/membrane_tcp/integration_test.exs @@ -11,15 +11,14 @@ defmodule Membrane.TCP.IntegrationTest do @server_port 6789 @localhost {127, 0, 0, 1} - @payload_frames 100 + @payload_frames 1000 + @data Enum.map(1..@payload_frames, &"(#{&1})") ++ ["."] test "send from server-side pipeline and receive on client-side pipeline" do - data = Enum.map(1..@payload_frames, &"(#{&1})") ++ ["."] - sender = Pipeline.start_link_supervised!( spec: - child(:source, %Testing.Source{output: data}) + child(:source, %Testing.Source{output: @data}) |> child(:sink, %TCP.Sink{ connection_side: :server, local_address: @localhost, @@ -45,14 +44,12 @@ defmodule Membrane.TCP.IntegrationTest do received_data = TCP.TestingSinkReceiver.receive_data(receiver) - assert received_data == Enum.join(data) + assert received_data == Enum.join(@data) Pipeline.terminate(sender) Pipeline.terminate(receiver) end test "send from client-side pipeline and receive on server-side pipeline" do - data = Enum.map(1..@payload_frames, &"(#{&1})") ++ ["."] - receiver = Pipeline.start_link_supervised!( spec: @@ -67,7 +64,7 @@ defmodule Membrane.TCP.IntegrationTest do sender = Pipeline.start_link_supervised!( spec: - child(:source, %Testing.Source{output: data}) + child(:source, %Testing.Source{output: @data}) |> child(:sink, %TCP.Sink{ connection_side: {:client, @localhost, @server_port}, local_address: @localhost @@ -82,7 +79,7 @@ defmodule Membrane.TCP.IntegrationTest do received_data = TCP.TestingSinkReceiver.receive_data(receiver) - assert received_data == Enum.join(data) + assert received_data == Enum.join(@data) Pipeline.terminate(sender) Pipeline.terminate(receiver) end From d3702aa36424d7817d7ef91fcf396f9a9eedf932 Mon Sep 17 00:00:00 2001 From: noarkhh Date: Mon, 13 May 2024 20:14:52 +0200 Subject: [PATCH 8/8] Add type for request_socket_control --- lib/membrane_tcp/source.ex | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/membrane_tcp/source.ex b/lib/membrane_tcp/source.ex index 7f39ee4..337bc19 100644 --- a/lib/membrane_tcp/source.ex +++ b/lib/membrane_tcp/source.ex @@ -60,6 +60,11 @@ defmodule Membrane.TCP.Source do flow_control: :manual, demand_unit: :buffers + @typedoc """ + Notification sent when a local socket handle was provided via `local_socket` option. + """ + @type request_socket_control() :: {:request_socket_control, :gen_tcp.socket(), pid()} + @impl true def handle_init(_context, opts) do {local_socket, remote_socket} =