Skip to content

Commit

Permalink
fix: Fix improper policy checking (#1085)
Browse files Browse the repository at this point in the history
* Presence had a bad check when tracking changes
* JWT checking had a bad check and a broken test
* Fixed some migrations to be less susceptible to errors on migrations
* Added one connection to db_pool
* Removed one connection from subcriber_pool_size
Co-authored-by: Chase Granberry <[email protected]>
  • Loading branch information
filipecabaco authored Jul 2, 2024
1 parent d0656cc commit d57b3cc
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 101 deletions.
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule Extensions.PostgresCdcRls do

@spec start(map()) :: :ok | {:error, :already_started | :reserved}
def start(%{"id" => tenant} = args) when is_binary(tenant) do
args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 5)})
args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 4)})

Logger.debug("Starting #{__MODULE__} extension with args: #{inspect(args, pretty: true)}")

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Realtime.Database do
@spec from_settings(map(), binary(), :stop | :exp | :rand | :rand_exp) :: Realtime.Database.t()
def from_settings(settings, application_name, backoff \\ :rand_exp) do
pool =
settings["subs_pool_size"] || settings["subcriber_pool_size"] || settings["db_pool"] || 1
settings["subs_pool_size"] || settings["als"] || settings["db_pool"] || 2

%__MODULE__{
host: settings["db_host"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeSubscriptionTable do
use Ecto.Migration

def change do
execute("create type realtime.equality_op as enum(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'equality_op') THEN
CREATE TYPE realtime.equality_op AS ENUM(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);
END IF;
END$$;
""")

execute("create type realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'user_defined_filter') THEN
CREATE TYPE realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);
END IF;
END$$;
""")

execute("create table realtime.subscription (
-- Tracks which users are subscribed to each table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeBuildPreparedStatementSqlFun
use Ecto.Migration

def change do
execute("create type realtime.wal_column as (
name text,
type text,
value jsonb,
is_pkey boolean,
is_selectable boolean
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN
CREATE TYPE realtime.wal_column AS (
name text,
type text,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
END IF;
END$$;
""")

execute("create function realtime.build_prepared_statement_sql(
prepared_statement_name text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeApplyRlsFunction do
use Ecto.Migration

def change do
execute(
"create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');"
)

execute("create type realtime.wal_rls as (
wal jsonb,
is_rls_enabled boolean,
users uuid[],
errors text[]
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'action') THEN
CREATE TYPE realtime.action AS ENUM (
'INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR'
);
END IF;
END$$;
""")

execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_rls') THEN
CREATE TYPE realtime.wal_rls AS (
wal jsonb,
is_rls_enabled boolean,
users uuid[],
errors text[]
);
END IF;
END$$;
""")

execute("create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)
returns realtime.wal_rls
language plpgsql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ defmodule Realtime.Tenants.Migrations.AddQuotedRegtypesSupport do
def change do
execute("drop type if exists realtime.wal_column cascade;")

execute("
create type realtime.wal_column as (
name text,
type_name text,
type_oid oid,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN
CREATE TYPE realtime.wal_column AS (
name text,
type_name text,
type_oid oid,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
END IF;
END$$;
""")

execute("
create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])
Expand Down
77 changes: 34 additions & 43 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule RealtimeWeb.RealtimeChannel do
:ok <- limit_max_users(socket.assigns),
{:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
{:ok, socket} <- assign_policies(sub_topic, db_conn, access_token, claims, socket) do
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, access_token, claims, socket) do
public? = !socket.assigns.check_authorization?
is_new_api = is_new_api(params)
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, public?)
Expand Down Expand Up @@ -249,9 +249,11 @@ defmodule RealtimeWeb.RealtimeChannel do
pg_change_params: pg_change_params
})}

{:error, error} when is_binary(error) ->
shutdown_response(socket, error)

{:error, error} ->
message = "Access token has expired: " <> Helpers.to_log(error)

shutdown_response(socket, message)
end
end
Expand Down Expand Up @@ -301,33 +303,41 @@ defmodule RealtimeWeb.RealtimeChannel do
%{"access_token" => refresh_token},
%{
assigns: %{
access_token: _access_token,
access_token: access_token,
pg_sub_ref: pg_sub_ref,
db_conn: db_conn,
channel_name: channel_name,
pg_change_params: pg_change_params
}
} = socket
)
when is_binary(refresh_token) do
socket = assign(socket, :access_token, refresh_token)

case confirm_token(socket, true) do
{:ok, claims, confirm_token_ref, _, socket} ->
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))
with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket),
{:ok, socket} <-
maybe_assign_policies(channel_name, db_conn, access_token, claims, socket) do
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))

pg_sub_ref =
case pg_change_params do
[_ | _] -> postgres_subscribe()
_ -> nil
end
pg_sub_ref =
case pg_change_params do
[_ | _] -> postgres_subscribe()
_ -> nil
end

assigns = %{
pg_sub_ref: pg_sub_ref,
confirm_token_ref: confirm_token_ref,
pg_change_params: pg_change_params
}

assigns = %{
pg_sub_ref: pg_sub_ref,
confirm_token_ref: confirm_token_ref,
pg_change_params: pg_change_params
}
{:noreply, assign(socket, assigns)}
else
{:error, error} when is_binary(error) ->
message = "Received an invalid access token from client: " <> error

{:noreply, assign(socket, assigns)}
shutdown_response(socket, message)

{:error, error} ->
message = "Received an invalid access token from client: " <> inspect(error)
Expand Down Expand Up @@ -472,7 +482,7 @@ defmodule RealtimeWeb.RealtimeChannel do
assign(socket, :access_token, tenant_token)
end

defp confirm_token(%{assigns: assigns} = socket, check_policy \\ false) do
defp confirm_token(%{assigns: assigns} = socket) do
%{
jwt_secret: jwt_secret,
access_token: access_token
Expand All @@ -483,38 +493,19 @@ defmodule RealtimeWeb.RealtimeChannel do
with jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, %{"exp" => exp} = claims} when is_integer(exp) <-
ChannelsAuthorization.authorize_conn(access_token, jwt_secret_dec, jwt_jwks),
exp_diff when exp_diff > 0 <- exp - Joken.current_time(),
{:ok, socket} <- validate_policy(socket, claims, check_policy) do
exp_diff when exp_diff > 0 <- exp - Joken.current_time() do
if ref = assigns[:confirm_token_ref], do: Helpers.cancel_timer(ref)

interval = min(@confirm_token_ms_interval, exp_diff * 1_000)
ref = Process.send_after(self(), :confirm_token, interval)

{:ok, claims, ref, access_token, socket}
else
{:error, e} -> {:error, e}
e -> {:error, e}
end
end

defp validate_policy(%{assigns: %{check_authorization?: true}} = socket, _claims, _check_policy) do
{:ok, socket}
end

defp validate_policy(socket, _claims, false) do
{:ok, socket}
end

defp validate_policy(%{assigns: assigns} = socket, claims, true) do
%{
access_token: access_token,
db_conn: db_conn,
channel_name: channel_name
} = assigns

with {:ok, socket} <- assign_policies(channel_name, db_conn, access_token, claims, socket) do
{:ok, socket}
end
end

defp shutdown_response(%{assigns: %{channel_name: channel_name}} = socket, message)
when is_binary(message) do
Helpers.log_error("ChannelShutdown", message)
Expand Down Expand Up @@ -642,7 +633,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end)
end

defp assign_policies(
defp maybe_assign_policies(
topic,
db_conn,
access_token,
Expand Down Expand Up @@ -676,7 +667,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

defp assign_policies(_, _, _, _, socket) do
defp maybe_assign_policies(_, _, _, _, socket) do
{:ok, assign(socket, policies: nil)}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
%{tenant_topic: topic, policies: policies} = assigns

case policies do
%Policies{presence: %PresencePolicies{write: false}} ->
%Policies{presence: %PresencePolicies{read: false}} ->
Logger.info("Presence track message ignored on #{topic}")
{:noreply, socket}

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.29.14",
version: "2.29.15",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading

0 comments on commit d57b3cc

Please sign in to comment.