Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add Broadcast Permission functions #796

Merged
merged 3 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions lib/realtime/api/broadcast.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Realtime.Api.Broadcast do
@moduledoc """
Defines the Channel schema
"""
use Ecto.Schema
import Ecto.Changeset
@derive {Jason.Encoder, only: [:inserted_at, :updated_at, :id, :channel_id]}

@type t :: %__MODULE__{}

@schema_prefix "realtime"
schema "broadcasts" do
field(:check, :boolean, default: false)
timestamps()

belongs_to(:channel, Realtime.Api.Channel)
end

def changeset(broadcast, attrs) do
broadcast
|> cast(attrs, [:check, :inserted_at, :updated_at, :channel_id])
|> put_timestamp(:updated_at)
|> maybe_put_timestamp(:inserted_at)
end

def check_changeset(broadcast, attrs) do
broadcast
|> change()
|> put_change(:check, attrs[:check])
end

defp put_timestamp(changeset, field) do
put_change(changeset, field, NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second))
end

defp maybe_put_timestamp(changeset, field) do
case Map.get(changeset.data, field, nil) do
nil -> put_timestamp(changeset, field)
_ -> changeset
end
end
end
4 changes: 3 additions & 1 deletion lib/realtime/api/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Realtime.Api.Channel do
field(:name, :string)
field(:check, :boolean, default: false)
timestamps()

has_many(:broadcasts, Realtime.Api.Broadcast)
end

def changeset(channel, attrs) do
Expand All @@ -30,7 +32,7 @@ defmodule Realtime.Api.Channel do
end

defp put_timestamp(changeset, field) do
put_change(changeset, field, DateTime.utc_now() |> DateTime.to_naive())
put_change(changeset, field, NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second))
end

defp maybe_put_timestamp(changeset, field) do
Expand Down
16 changes: 14 additions & 2 deletions lib/realtime/channels.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Realtime.Channels do
Handles Channel related operations
"""

alias Realtime.Api.Broadcast
alias Realtime.Api.Channel
alias Realtime.Repo

Expand All @@ -26,13 +27,24 @@ defmodule Realtime.Channels do
end

@doc """
Creates a channel in the tenant database using a given DBConnection
Creates a channel and supporting tables for a given channel in the tenant database using a given DBConnection.

This tables will be used for to set Authorizations. Please read more at Realtime.Tenants.Authorization
"""
@spec create_channel(map(), DBConnection.conn()) :: {:ok, Channel.t()} | {:error, any()}
def create_channel(attrs, conn) do
channel = Channel.changeset(%Channel{}, attrs)

Repo.insert(conn, channel, Channel)
Postgrex.transaction(conn, fn transaction_conn ->
with {:ok, channel} <- Repo.insert(transaction_conn, channel, Channel),
changeset = Broadcast.changeset(%Broadcast{}, %{channel_id: channel.id}),
{:ok, _} <- Repo.insert(transaction_conn, changeset, Broadcast) do
channel
else
{:error, changeset} ->
Postgrex.rollback(transaction_conn, changeset)
end
end)
end

@doc """
Expand Down
153 changes: 68 additions & 85 deletions lib/realtime/tenants/authorization.ex
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
defmodule Realtime.Tenants.Authorization do
@moduledoc """
Runs validations based on RLS policies to set permissions for a given connection.
Runs validations based on RLS policies to set policies for a given connection and
creates a Realtime.Tenants.Policies struct with the accumulated results of the policies
for a given user and a given channel context

It will assign the a new key to a socket or a conn with the following:
* read - a boolean indicating whether the connection has read permissions
Each feature will have their own set of ways to check Policies against the Authorization context.

Check more information at Realtime.Tenants.Authorization.Policies
"""
require Logger

import Ecto.Query

alias Realtime.Repo
alias Realtime.Api.Channel
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.ChannelPolicies

defstruct [:channel, :headers, :jwt, :claims, :role]

defmodule Permissions do
defstruct read: false, write: false

@type t :: %__MODULE__{
:read => boolean(),
:write => boolean()
}
end

@type t :: %__MODULE__{
:channel => Channel.t() | nil,
:claims => map(),
:headers => keyword({binary(), binary()}),
:jwt => map(),
:role => binary()
}

@doc """
Builds a new authorization params struct.
Builds a new authorization struct which will be used to retain the information required to check Policies.

Requires a map with the following keys:
* channel: Realtime.Api.Channel struct for which channel is being accessed
* headers: Request headers when the connection was made or WS was updated
* jwt: JWT String
* claims: JWT claims
* role: JWT role
"""
@spec build_authorization_params(map()) :: t()
def build_authorization_params(%{
channel: channel,
headers: headers,
Expand All @@ -49,46 +52,45 @@ defmodule Realtime.Tenants.Authorization do
}
end

@spec get_authorizations(Phoenix.Socket.t() | Plug.Conn.t(), DBConnection.t(), __MODULE__.t()) ::
{:ok, Phoenix.Socket.t() | Plug.Conn.t()} | {:error, :unauthorized}
@doc """
Runs validations based on RLS policies to set permissions for a given connection (either Phoenix.Socket or Plug.Conn).
Runs validations based on RLS policies to set policies for a given connection (either Phoenix.Socket or Plug.Conn).
"""
def get_authorizations(%Phoenix.Socket{} = socket, db_conn, params) do
case get_permissions_for_connection(db_conn, params) do
{:ok, permissions} -> {:ok, Phoenix.Socket.assign(socket, :permissions, permissions)}
@spec get_authorizations(Phoenix.Socket.t() | Plug.Conn.t(), DBConnection.t(), __MODULE__.t()) ::
{:ok, Phoenix.Socket.t() | Plug.Conn.t()} | {:error, :unauthorized}
def get_authorizations(%Phoenix.Socket{} = socket, db_conn, authorization_context) do
case get_policies_for_connection(db_conn, authorization_context) do
{:ok, policies} -> {:ok, Phoenix.Socket.assign(socket, :policies, policies)}
_ -> {:error, :unauthorized}
end
end

def get_authorizations(%Plug.Conn{} = conn, db_conn, params) do
case get_permissions_for_connection(db_conn, params) do
{:ok, permissions} -> {:ok, Plug.Conn.assign(conn, :permissions, permissions)}
def get_authorizations(%Plug.Conn{} = conn, db_conn, authorization_context) do
case get_policies_for_connection(db_conn, authorization_context) do
{:ok, policies} -> {:ok, Plug.Conn.assign(conn, :policies, policies)}
_ -> {:error, :unauthorized}
end
end

defp get_permissions_for_connection(conn, params) do
Postgrex.transaction(conn, fn transaction_conn ->
set_config(transaction_conn, params)
permissions = %Permissions{}

with {:ok, %{write: false} = permissions} <-
check_write_permissions(transaction_conn, permissions, params),
{:ok, permissions} <- check_read_permissions(transaction_conn, permissions) do
{:ok, permissions}
end
end)
end

defp set_config(conn, params) do
@doc """
Sets the current connection configuration with the following config values:
* role: The role of the user
* realtime.channel_name: The name of the channel being accessed
* request.jwt.claim.role: The role of the user
* request.jwt: The JWT token
* request.jwt.claim.sub: The sub claim of the JWT token
* request.jwt.claims: The claims of the JWT token
* request.headers: The headers of the request
"""
@spec set_conn_config(DBConnection.t(), t()) ::
{:ok, Postgrex.Result.t()} | {:error, Exception.t()}
def set_conn_config(conn, authorization_context) do
%__MODULE__{
channel: channel,
headers: headers,
jwt: jwt,
claims: claims,
role: role
} = params
} = authorization_context

sub = Map.get(claims, :sub)
claims = Jason.encode!(claims)
Expand All @@ -111,50 +113,31 @@ defmodule Realtime.Tenants.Authorization do
)
end

defp check_read_permissions(conn, permissions) do
query = from(c in Channel, select: c.name)

case Repo.all(conn, query, Channel, mode: :savepoint) do
{:ok, channels} when channels != [] ->
{:ok, %Permissions{permissions | read: true}}

{:ok, _} ->
{:ok, %Permissions{permissions | read: false}}

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
{:ok, %Permissions{permissions | read: false}}

{:error, error} ->
Logger.error("Error getting permissions for connection: #{inspect(error)}")
{:error, error}
end
end

defp check_write_permissions(_, permissions, %__MODULE__{channel: nil}) do
{:ok, %Permissions{permissions | write: false}}
end

defp check_write_permissions(conn, permissions, %__MODULE__{channel: channel}) do
changeset = Channel.check_changeset(channel, %{check: true})

case Repo.update(conn, changeset, Channel, mode: :savepoint) do
{:ok, %Channel{check: true} = channel} ->
revert_changeset = Channel.check_changeset(channel, %{check: nil})
{:ok, _} = Repo.update(conn, revert_changeset, Channel)
{:ok, %Permissions{permissions | write: true, read: true}}

{:ok, _} ->
{:ok, %Permissions{permissions | write: false}}

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
{:ok, %Permissions{permissions | write: false}}

{:error, :not_found} ->
{:ok, %Permissions{permissions | write: false}}

{:error, error} ->
Logger.error("Error getting permissions for connection: #{inspect(error)}")
{:error, error}
end
@policies_mods [ChannelPolicies, BroadcastPolicies]
defp get_policies_for_connection(conn, authorization_context) do
Postgrex.transaction(conn, fn transaction_conn ->
set_conn_config(transaction_conn, authorization_context)

Enum.reduce_while(@policies_mods, %Policies{}, fn policies_mod, policies ->
with {:ok, policies} <-
policies_mod.check_write_policies(
transaction_conn,
policies,
authorization_context
),
{:ok, policies} <-
policies_mod.check_read_policies(
transaction_conn,
policies,
authorization_context
) do
{:cont, policies}
else
{:error, _} ->
Postgrex.rollback(transaction_conn, :unauthorized)
{:halt, {:error, :unauthorized}}
end
end)
end)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Realtime.Tenants.Authorization.Policies.BroadcastPolicies do
@moduledoc """
ChannelPolicies structure that holds the required authorization information for a given connection within the scope of a reading / altering channel entities

Uses the Realtime.Api.Channel to try reads and writes on the database to determine authorization for a given connection.

Implements Realtime.Tenants.Authorization behaviour
"""
require Logger
import Ecto.Query

alias Realtime.Api.Broadcast
alias Realtime.Api.Channel
alias Realtime.Repo
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies

defstruct read: false, write: false

@behaviour Realtime.Tenants.Authorization.Policies

@type t :: %__MODULE__{
:read => boolean(),
:write => boolean()
}
@impl true
def check_read_policies(_conn, policies, %Authorization{channel: nil}) do
{:ok, Policies.update_policies(policies, :broadcast, :read, false)}
end

def check_read_policies(conn, %Policies{} = policies, %Authorization{
channel: %Channel{id: channel_id}
}) do
query = from(b in Broadcast, where: b.channel_id == ^channel_id)

Postgrex.transaction(conn, fn transaction_conn ->
case Repo.one(conn, query, Broadcast) do
{:ok, %Broadcast{}} ->
Policies.update_policies(policies, :broadcast, :read, true)

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
Policies.update_policies(policies, :broadcast, :read, false)

{:error, :not_found} ->
Policies.update_policies(policies, :broadcast, :read, false)

{:error, error} ->
Logger.error("Error getting broadcast read policies for connection: #{inspect(error)}")

Postgrex.rollback(transaction_conn, error)
end
end)
end

@impl true
def check_write_policies(_conn, policies, %Authorization{channel: nil}) do
{:ok, Policies.update_policies(policies, :broadcast, :write, false)}
end

def check_write_policies(conn, policies, %Authorization{
channel: %Channel{id: channel_id}
}) do
query = from(b in Broadcast, where: b.channel_id == ^channel_id)

Postgrex.transaction(conn, fn transaction_conn ->
case Repo.one(conn, query, Broadcast) do
{:ok, %Broadcast{} = broadcast} ->
changeset = Broadcast.check_changeset(broadcast, %{check: true})

case Repo.update(conn, changeset, Broadcast, mode: :savepoint) do
{:ok, %Broadcast{check: true} = broadcast} ->
revert_changeset = Broadcast.check_changeset(broadcast, %{check: false})
{:ok, _} = Repo.update(conn, revert_changeset, Broadcast)
Policies.update_policies(policies, :broadcast, :write, true)

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
Policies.update_policies(policies, :broadcast, :write, false)

{:error, :not_found} ->
Policies.update_policies(policies, :broadcast, :write, false)

{:error, error} ->
Logger.error(
"Error getting broadcast write policies for connection: #{inspect(error)}"
)

Postgrex.rollback(transaction_conn, error)
end

{:error, :not_found} ->
Policies.update_policies(policies, :broadcast, :write, false)
end
end)
end
end
Loading
Loading