Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix improper policy checking #1085

Merged
merged 8 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule Extensions.PostgresCdcRls do

@spec start(map()) :: :ok | {:error, :already_started | :reserved}
def start(%{"id" => tenant} = args) when is_binary(tenant) do
args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 5)})
args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 4)})

Logger.debug("Starting #{__MODULE__} extension with args: #{inspect(args, pretty: true)}")

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Realtime.Database do
@spec from_settings(map(), binary(), :stop | :exp | :rand | :rand_exp) :: Realtime.Database.t()
def from_settings(settings, application_name, backoff \\ :rand_exp) do
pool =
settings["subs_pool_size"] || settings["subcriber_pool_size"] || settings["db_pool"] || 1
settings["subs_pool_size"] || settings["subcriber_pool_size"] || settings["db_pool"] || 2

%__MODULE__{
host: settings["db_host"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeSubscriptionTable do
use Ecto.Migration

def change do
execute("create type realtime.equality_op as enum(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'equality_op') THEN
CREATE TYPE realtime.equality_op AS ENUM(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);
END IF;
END$$;
""")

execute("create type realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'user_defined_filter') THEN
CREATE TYPE realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);
END IF;
END$$;
""")

execute("create table realtime.subscription (
-- Tracks which users are subscribed to each table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeBuildPreparedStatementSqlFun
use Ecto.Migration

def change do
execute("create type realtime.wal_column as (
name text,
type text,
value jsonb,
is_pkey boolean,
is_selectable boolean
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN
CREATE TYPE realtime.wal_column AS (
name text,
type text,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
END IF;
END$$;
""")

execute("create function realtime.build_prepared_statement_sql(
prepared_statement_name text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeApplyRlsFunction do
use Ecto.Migration

def change do
execute(
"create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');"
)

execute("create type realtime.wal_rls as (
wal jsonb,
is_rls_enabled boolean,
users uuid[],
errors text[]
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'action') THEN
CREATE TYPE realtime.action AS ENUM (
'INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR'
);
END IF;
END$$;
""")

execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_rls') THEN
CREATE TYPE realtime.wal_rls AS (
wal jsonb,
is_rls_enabled boolean,
users uuid[],
errors text[]
);
END IF;
END$$;
""")

execute("create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)
returns realtime.wal_rls
language plpgsql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ defmodule Realtime.Tenants.Migrations.AddQuotedRegtypesSupport do
def change do
execute("drop type if exists realtime.wal_column cascade;")

execute("
create type realtime.wal_column as (
name text,
type_name text,
type_oid oid,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN
CREATE TYPE realtime.wal_column AS (
name text,
type_name text,
type_oid oid,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
END IF;
END$$;
""")

execute("
create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])
Expand Down
79 changes: 35 additions & 44 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule RealtimeWeb.RealtimeChannel do
:ok <- limit_max_users(socket.assigns),
{:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
{:ok, socket} <- assign_policies(sub_topic, db_conn, access_token, claims, socket) do
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, access_token, claims, socket) do
public? = !socket.assigns.check_authorization?
is_new_api = is_new_api(params)
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, public?)
Expand Down Expand Up @@ -249,9 +249,11 @@ defmodule RealtimeWeb.RealtimeChannel do
pg_change_params: pg_change_params
})}

{:error, error} ->
message = "Access token has expired: " <> Helpers.to_log(error)
{:error, error} when is_binary(error) ->
shutdown_response(socket, error)

{:error, error} ->
message = Helpers.to_log(error)
shutdown_response(socket, message)
end
end
Expand Down Expand Up @@ -301,33 +303,41 @@ defmodule RealtimeWeb.RealtimeChannel do
%{"access_token" => refresh_token},
%{
assigns: %{
access_token: _access_token,
access_token: access_token,
pg_sub_ref: pg_sub_ref,
db_conn: db_conn,
channel_name: channel_name,
pg_change_params: pg_change_params
}
} = socket
)
when is_binary(refresh_token) do
socket = assign(socket, :access_token, refresh_token)

case confirm_token(socket, true) do
{:ok, claims, confirm_token_ref, _, socket} ->
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))
with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket),
{:ok, socket} <-
maybe_assign_policies(channel_name, db_conn, access_token, claims, socket) do
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))

pg_sub_ref =
case pg_change_params do
[_ | _] -> postgres_subscribe()
_ -> nil
end
pg_sub_ref =
case pg_change_params do
[_ | _] -> postgres_subscribe()
_ -> nil
end

assigns = %{
pg_sub_ref: pg_sub_ref,
confirm_token_ref: confirm_token_ref,
pg_change_params: pg_change_params
}

assigns = %{
pg_sub_ref: pg_sub_ref,
confirm_token_ref: confirm_token_ref,
pg_change_params: pg_change_params
}
{:noreply, assign(socket, assigns)}
else
{:error, error} when is_binary(error) ->
message = "Received an invalid access token from client: " <> error

{:noreply, assign(socket, assigns)}
shutdown_response(socket, message)

{:error, error} ->
message = "Received an invalid access token from client: " <> inspect(error)
Expand Down Expand Up @@ -472,7 +482,7 @@ defmodule RealtimeWeb.RealtimeChannel do
assign(socket, :access_token, tenant_token)
end

defp confirm_token(%{assigns: assigns} = socket, check_policy \\ false) do
defp confirm_token(%{assigns: assigns} = socket) do
%{
jwt_secret: jwt_secret,
access_token: access_token
Expand All @@ -483,38 +493,19 @@ defmodule RealtimeWeb.RealtimeChannel do
with jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, %{"exp" => exp} = claims} when is_integer(exp) <-
ChannelsAuthorization.authorize_conn(access_token, jwt_secret_dec, jwt_jwks),
exp_diff when exp_diff > 0 <- exp - Joken.current_time(),
{:ok, socket} <- validate_policy(socket, claims, check_policy) do
exp_diff when exp_diff > 0 <- exp - Joken.current_time() do
if ref = assigns[:confirm_token_ref], do: Helpers.cancel_timer(ref)

interval = min(@confirm_token_ms_interval, exp_diff * 1_000)
ref = Process.send_after(self(), :confirm_token, interval)

{:ok, claims, ref, access_token, socket}
else
{:error, e} -> {:error, e}
e -> {:error, e}
end
end

defp validate_policy(%{assigns: %{check_authorization?: true}} = socket, _claims, _check_policy) do
{:ok, socket}
end

defp validate_policy(socket, _claims, false) do
{:ok, socket}
end

defp validate_policy(%{assigns: assigns} = socket, claims, true) do
%{
access_token: access_token,
db_conn: db_conn,
channel_name: channel_name
} = assigns

with {:ok, socket} <- assign_policies(channel_name, db_conn, access_token, claims, socket) do
{:ok, socket}
end
end

defp shutdown_response(%{assigns: %{channel_name: channel_name}} = socket, message)
when is_binary(message) do
Helpers.log_error("ChannelShutdown", message)
Expand Down Expand Up @@ -642,7 +633,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end)
end

defp assign_policies(
defp maybe_assign_policies(
topic,
db_conn,
access_token,
Expand Down Expand Up @@ -676,7 +667,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

defp assign_policies(_, _, _, _, socket) do
defp maybe_assign_policies(_, _, _, _, socket) do
{:ok, assign(socket, policies: nil)}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
%{tenant_topic: topic, policies: policies} = assigns

case policies do
%Policies{presence: %PresencePolicies{write: false}} ->
%Policies{presence: %PresencePolicies{read: false}} ->
Logger.info("Presence track message ignored on #{topic}")
{:noreply, socket}

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.29.14",
version: "2.29.15",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading
Loading