Skip to content

Commit

Permalink
Listener refactor, refresh request handling (#5)
Browse files Browse the repository at this point in the history
* Refactor, time_to_expiry handling

* Add better logs, refactors

* Update README.md, minor improvements

* Refactor error handling
  • Loading branch information
LVala authored Jul 20, 2023
1 parent 3aab9d8 commit f48f8d4
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 329 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

TURN server.

Implementation of [RFC 8656](https://datatracker.ietf.org/doc/html/rfc8656).
Implementation of [RFC 5766](https://datatracker.ietf.org/doc/html/rfc5766) and [RFC 6156](https://datatracker.ietf.org/doc/html/rfc6156#autoid-7).
Supports authentication described in [A REST API For Access to TURN Services](https://datatracker.ietf.org/doc/html/draft-uberti-rtcweb-turn-rest-00).
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ config :ex_turn,

config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:file, :line]
metadata: [:listener, :client, :alloc]

import_config "#{config_env()}.exs"
1 change: 0 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
import Config
140 changes: 96 additions & 44 deletions lib/ex_turn/allocation_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ defmodule ExTURN.AllocationHandler do

alias ExSTUN.Message
alias ExSTUN.Message.Type
alias ExSTUN.Message.Attribute.{ErrorCode, XORMappedAddress}
alias ExSTUN.Message.Attribute.ErrorCode

alias ExTURN.Auth
alias ExTURN.Attribute.{ChannelNumber, Data, XORPeerAddress}
alias ExTURN.Attribute.{ChannelNumber, Data, Lifetime, XORPeerAddress}
alias ExTURN.Utils

def start_link(turn_socket, alloc_socket, five_tuple, username) do
def start_link(turn_socket, alloc_socket, five_tuple, username, lifetime) do
{:ok, {_alloc_ip, alloc_port}} = :inet.sockname(alloc_socket)

GenServer.start_link(
Expand All @@ -19,30 +20,42 @@ defmodule ExTURN.AllocationHandler do
turn_socket: turn_socket,
alloc_socket: alloc_socket,
five_tuple: five_tuple,
username: username
username: username,
time_to_expiry: lifetime
],
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_port}}
)
end

@spec process_message(GenServer.server(), term()) :: :ok
def process_message(allocation, msg) do
GenServer.cast(allocation, {:msg, msg})
end

@impl true
def init(
turn_socket: turn_socket,
alloc_socket: socket,
five_tuple: five_tuple,
username: username
username: username,
time_to_expiry: time_to_expiry
) do
Logger.info("Starting allocation handler #{inspect(five_tuple)}")
{c_ip, c_port, s_ip, s_port, _transport} = five_tuple
alloc_id = "(#{:inet.ntoa(c_ip)}:#{c_port}, #{:inet.ntoa(s_ip)}:#{s_port}, UDP)"
Logger.metadata(alloc: alloc_id)
Logger.info("Starting new allocation handler")

Process.send_after(self(), :measure_bitrate, 1000)
Process.send_after(self(), :check_expiration, time_to_expiry * 1000)

{:ok,
%{
alloc_id: "#{inspect(five_tuple)}",
alloc_id: alloc_id,
turn_socket: turn_socket,
socket: socket,
five_tuple: five_tuple,
username: username,
expiry_timestamp: System.os_time(:second) + time_to_expiry,
permissions: MapSet.new(),
channels: %{},

Expand All @@ -56,9 +69,11 @@ defmodule ExTURN.AllocationHandler do
end

@impl true
def handle_info({:msg, msg}, state) do
state = handle_msg(msg, state)
{:noreply, state}
def handle_cast({:msg, msg}, state) do
case handle_msg(msg, state) do
{:ok, state} -> {:noreply, state}
{:allocation_expired, state} -> {:stop, {:shutdown, :allocation_expired}, state}
end
end

@impl true
Expand Down Expand Up @@ -96,12 +111,61 @@ defmodule ExTURN.AllocationHandler do
{:noreply, state}
end

@impl true
def handle_info(:check_expiration, state) do
if System.os_time(:second) >= state.expiry_timestamp do
Logger.info("Allocation expired, shutting down allocation handler")
{:stop, {:shutdown, :allocation_expired}, state}
else
{:noreply, state}
end
end

@impl true
def handle_info(msg, state) do
Logger.warn("Got unexpected msg: #{inspect(msg)}")
Logger.warning("Got unexpected OTP message: #{inspect(msg)}")
{:noreply, state}
end

@impl true
def terminate(reason, _state) do
Logger.info("Allocation handler stopped with reason: #{inspect(reason)}")
end

defp handle_msg(%Message{type: %Type{class: :request, method: :refresh}} = msg, state) do
Logger.info("Received refresh request")
{c_ip, c_port, _, _, _} = state.five_tuple

with {:ok, key} <- Auth.authenticate(msg, username: state.username),
{:ok, time_to_expiry} <- Utils.get_lifetime(msg) do
type = %Type{class: :success_response, method: :refresh}

msg.transaction_id
|> Message.new(type, [%Lifetime{lifetime: time_to_expiry}])
|> Message.with_integrity(key)
|> Message.encode()
|> then(&:gen_udp.send(state.turn_socket, c_ip, c_port, &1))

if time_to_expiry == 0 do
Logger.info("Allocation deleted with LIFETIME=0 refresh request")
{:allocation_expired, state}
else
state = %{state | expiry_timestamp: System.os_time(:second) + time_to_expiry}
Process.send_after(self(), :check_expiration, time_to_expiry * 1000)

Logger.info("Succesfully refreshed allocation, new 'time-to-expiry': #{time_to_expiry}")

{:ok, state}
end
else
{:error, reason} ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warn(log_msg)
:gen_udp.send(state.turn_socket, c_ip, c_port, response)
{:ok, state}
end
end

defp handle_msg(%Message{type: %Type{class: :request, method: :create_permission}} = msg, state) do
{c_ip, c_port, _, _, _} = state.five_tuple

Expand All @@ -116,43 +180,29 @@ defmodule ExTURN.AllocationHandler do

type = %Type{class: :success_response, method: msg.type.method}

response =
msg.transaction_id
|> Message.new(type, [])
|> Message.with_integrity(key)
|> Message.encode()
msg.transaction_id
|> Message.new(type, [])
|> Message.with_integrity(key)
|> Message.encode()
|> then(&:gen_udp.send(state.turn_socket, c_ip, c_port, &1))

:gen_udp.send(state.turn_socket, c_ip, c_port, response)
state
{:ok, state}

{:error, response} ->
:gen_udp.send(state.turn_socket, c_ip, c_port, Message.encode(response))
state
{:error, reason} ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warn(log_msg)
:gen_udp.send(state.turn_socket, c_ip, c_port, response)
{:ok, state}
end
end

defp handle_msg(%Message{type: %Type{class: :request, method: :binding}} = msg, state) do
{c_ip, c_port, _, _, _} = state.five_tuple
type = %Type{class: :success_response, method: msg.type.method}

response =
Message.new(msg.transaction_id, type, [
%XORMappedAddress{port: c_port, address: c_ip}
])
|> Message.encode()

:gen_udp.send(state.turn_socket, c_ip, c_port, response)

state
end

defp handle_msg(%Message{type: %Type{class: :indication, method: :send}} = msg, state) do
{:ok, xor_addr} = Message.get_attribute(msg, XORPeerAddress)
{:ok, data} = Message.get_attribute(msg, Data)

:gen_udp.send(state.socket, xor_addr.address, xor_addr.port, data.value)

%{state | out_bytes: state.out_bytes + byte_size(data.value)}
{:ok, %{state | out_bytes: state.out_bytes + byte_size(data.value)}}
end

defp handle_msg(%Message{type: %Type{class: :request, method: :channel_bind}} = msg, state) do
Expand Down Expand Up @@ -186,11 +236,13 @@ defmodule ExTURN.AllocationHandler do

:gen_udp.send(state.turn_socket, c_ip, c_port, response)

state
{:ok, state}

{:error, response} ->
:gen_udp.send(state.turn_socket, c_ip, c_port, Message.encode(response))
state
{:error, reason} ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warn(log_msg)
:gen_udp.send(state.turn_socket, c_ip, c_port, response)
{:ok, state}
end
end

Expand All @@ -199,16 +251,16 @@ defmodule ExTURN.AllocationHandler do
xor_addr = Map.fetch!(state.channels, channel_num)
:gen_udp.send(state.socket, xor_addr.address, xor_addr.port, data)

%{state | out_bytes: state.out_bytes + byte_size(data)}
{:ok, %{state | out_bytes: state.out_bytes + byte_size(data)}}
end

# defp handle_msg(<<channel_num::16, len::16, data::binary>>, state) do

# end

defp handle_msg(msg, state) do
Logger.warn("Got unexpected TURN message: #{inspect(msg, limit: :infinity)}")
state
Logger.warning("Got unexpected TURN message: #{inspect(msg, limit: :infinity)}")
{:ok, state}
end

defp family({_, _, _, _}), do: :ipv4
Expand Down
11 changes: 11 additions & 0 deletions lib/ex_turn/attributes/lifetime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,15 @@ defmodule ExTURN.Attribute.Lifetime do
def to_raw(%__MODULE__{} = attr, _msg) do
%RawAttribute{type: @attr_type, value: <<attr.lifetime::32>>}
end

@impl true
def from_raw(%RawAttribute{} = raw_attr, _msg) do
decode(raw_attr.value)
end

defp decode(<<lifetime::32>>) do
{:ok, %__MODULE__{lifetime: lifetime}}
end

defp decode(_data), do: {:error, :invalid_lifetime}
end
66 changes: 8 additions & 58 deletions lib/ex_turn/auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ defmodule ExTURN.Auth do
require Logger

alias ExSTUN.Message
alias ExSTUN.Message.Type
alias ExSTUN.Message.Attribute.{ErrorCode, Nonce, MessageIntegrity, Realm, Username}
alias ExSTUN.Message.Attribute.{Nonce, MessageIntegrity, Realm, Username}

@auth_secret Application.compile_env!(:ex_turn, :auth_secret)
@domain_name Application.compile_env!(:ex_turn, :domain_name)
@nonce_secret Application.compile_env!(:ex_turn, :nonce_secret)
# 1 hour in nanoseconds, see https://datatracker.ietf.org/doc/html/rfc5766#section-4
@nonce_lifetime 60 * 60 * 1_000_000_000
# 1 day in seconds by default, see https://datatracker.ietf.org/doc/html/draft-uberti-rtcweb-turn-rest-00#section-2.2
@credentials_lifetime Application.compile_env(:ex_turn, :credentials_lifetime, 24 * 60 * 60)

@spec authenticate(Message.t(), username: String.t()) :: {:ok, binary()} | {:error, Message.t()}
@spec authenticate(Message.t(), username: String.t()) :: {:ok, binary()} | {:error, atom()}
def authenticate(%Message{} = msg, opts \\ []) do
with :ok <- verify_message_integrity(msg),
{:ok, username, nonce} <- verify_attrs_presence(msg),
Expand All @@ -24,40 +22,15 @@ defmodule ExTURN.Auth do
{:ok, key} <- Message.authenticate_lt(msg, password) do
{:ok, key}
else
{:error, :no_message_integrity} ->
Logger.info("No message integrity attribute. Seems like a new allocation.")
{:error, build_error(msg.transaction_id, msg.type.method, 401, with_attrs?: true)}

{:error, :attrs_missing} ->
Logger.info("No username, nonce or realm attribute. Rejecting.")
{:error, build_error(msg.transaction_id, msg.type.method, 400)}

{:error, :invalid_timestamp} ->
Logger.info("Username timestamp expired. Rejecting.")
{:error, build_error(msg.transaction_id, msg.type.method, 401, with_attrs?: true)}

{:error, :invalid_username} ->
Logger.info("Username differs from the one used previously. Rejecting.")
{:error, build_error(msg.transaction_id, msg.type.method, 401, with_attrs?: true)}

{:error, :stale_nonce} ->
Logger.info("Stale nonce. Rejecting.")
{:error, build_error(msg.transaction_id, msg.type.method, 438, with_attrs?: true)}

:error ->
Logger.info("Bad message integrity")
{:error, build_error(msg.transaction_id, msg.type.method, 401, with_attrs?: true)}
:error -> {:error, :invalid_message_integrity}
{:error, _reason} = err -> err
end
end

defp verify_message_integrity(msg) do
case Message.get_attribute(msg, MessageIntegrity) do
{:ok, %MessageIntegrity{} = msg_int} ->
Logger.info("Got message integrity, #{inspect(msg_int)}")
:ok

nil ->
{:error, :no_message_integrity}
{:ok, %MessageIntegrity{}} -> :ok
nil -> {:error, :no_message_integrity}
end
end

Expand All @@ -67,7 +40,7 @@ defmodule ExTURN.Auth do
{:ok, %Nonce{value: nonce}} <- Message.get_attribute(msg, Nonce) do
{:ok, username, nonce}
else
nil -> {:error, :attrs_missing}
nil -> {:error, :auth_attrs_missing}
end
end

Expand All @@ -78,7 +51,7 @@ defmodule ExTURN.Auth do
false <- expiry_time - System.os_time(:second) <= 0 do
:ok
else
_other -> {:error, :invalid_timestamp}
_other -> {:error, :invalid_username_timestamp}
end
end

Expand All @@ -101,29 +74,6 @@ defmodule ExTURN.Auth do
if is_hash_valid? and not is_stale?, do: :ok, else: {:error, :stale_nonce}
end

defp build_nonce() do
# inspired by https://datatracker.ietf.org/doc/html/rfc7616#section-5.4
timestamp = System.monotonic_time(:nanosecond)
hash = :crypto.hash(:sha256, "#{timestamp}:#{@nonce_secret}")
"#{timestamp} #{hash}" |> :base64.encode()
end

defp build_error(t_id, method, code, opts \\ []) do
with_attrs? = Keyword.get(opts, :with_attrs?, false)
error_type = %Type{class: :error_response, method: method}

attrs = [%ErrorCode{code: code}]

attrs =
if with_attrs? do
attrs ++ [%Nonce{value: build_nonce()}, %Realm{value: @domain_name}]
else
attrs
end

Message.new(t_id, error_type, attrs)
end

@spec generate_credentials(String.t() | nil) ::
{username :: String.t(), password :: String.t(), ttl :: non_neg_integer()}
def generate_credentials(username \\ nil) do
Expand Down
Loading

0 comments on commit f48f8d4

Please sign in to comment.