From d57b3cc32d9268e9913205b2e41ddf7f62676b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Tue, 2 Jul 2024 11:32:33 +0100 Subject: [PATCH] fix: Fix improper policy checking (#1085) * Presence had a bad check when tracking changes * JWT checking had a bad check and a broken test * Fixed some migrations to be less susceptible to errors on migrations * Added one connection to db_pool * Removed one connection from subcriber_pool_size Co-authored-by: Chase Granberry --- lib/extensions/postgres_cdc_rls/cdc_rls.ex | 2 +- lib/realtime/database.ex | 2 +- ...4918_create_realtime_subscription_table.ex | 30 ++++++-- ...e_build_prepared_statement_sql_function.ex | 21 +++-- ...4523_create_realtime_apply_rls_function.ex | 35 ++++++--- ...20603231003_add_quoted_regtypes_support.ex | 25 +++--- lib/realtime_web/channels/realtime_channel.ex | 77 ++++++++----------- .../realtime_channel/presence_handler.ex | 2 +- mix.exs | 2 +- test/integration/rt_channel_test.exs | 69 ++++++++++++----- 10 files changed, 164 insertions(+), 101 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/cdc_rls.ex b/lib/extensions/postgres_cdc_rls/cdc_rls.ex index ed40238e6..cd787a960 100644 --- a/lib/extensions/postgres_cdc_rls/cdc_rls.ex +++ b/lib/extensions/postgres_cdc_rls/cdc_rls.ex @@ -97,7 +97,7 @@ defmodule Extensions.PostgresCdcRls do @spec start(map()) :: :ok | {:error, :already_started | :reserved} def start(%{"id" => tenant} = args) when is_binary(tenant) do - args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 5)}) + args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 4)}) Logger.debug("Starting #{__MODULE__} extension with args: #{inspect(args, pretty: true)}") diff --git a/lib/realtime/database.ex b/lib/realtime/database.ex index abf74aa51..0a793f91e 100644 --- a/lib/realtime/database.ex +++ b/lib/realtime/database.ex @@ -38,7 +38,7 @@ defmodule Realtime.Database do @spec from_settings(map(), binary(), :stop | :exp | :rand | :rand_exp) :: Realtime.Database.t() def from_settings(settings, application_name, backoff \\ :rand_exp) do pool = - settings["subs_pool_size"] || settings["subcriber_pool_size"] || settings["db_pool"] || 1 + settings["subs_pool_size"] || settings["als"] || settings["db_pool"] || 2 %__MODULE__{ host: settings["db_host"], diff --git a/lib/realtime/tenants/repo/migrations/20211116024918_create_realtime_subscription_table.ex b/lib/realtime/tenants/repo/migrations/20211116024918_create_realtime_subscription_table.ex index 7da3685c1..988b65e66 100644 --- a/lib/realtime/tenants/repo/migrations/20211116024918_create_realtime_subscription_table.ex +++ b/lib/realtime/tenants/repo/migrations/20211116024918_create_realtime_subscription_table.ex @@ -4,15 +4,29 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeSubscriptionTable do use Ecto.Migration def change do - execute("create type realtime.equality_op as enum( - 'eq', 'neq', 'lt', 'lte', 'gt', 'gte' - );") + execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'equality_op') THEN + CREATE TYPE realtime.equality_op AS ENUM( + 'eq', 'neq', 'lt', 'lte', 'gt', 'gte' + ); + END IF; + END$$; + """) - execute("create type realtime.user_defined_filter as ( - column_name text, - op realtime.equality_op, - value text - );") + execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'user_defined_filter') THEN + CREATE TYPE realtime.user_defined_filter as ( + column_name text, + op realtime.equality_op, + value text + ); + END IF; + END$$; + """) execute("create table realtime.subscription ( -- Tracks which users are subscribed to each table diff --git a/lib/realtime/tenants/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex b/lib/realtime/tenants/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex index 15eea8e8c..d5a9a05b7 100644 --- a/lib/realtime/tenants/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex +++ b/lib/realtime/tenants/repo/migrations/20211116212300_create_realtime_build_prepared_statement_sql_function.ex @@ -4,13 +4,20 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeBuildPreparedStatementSqlFun use Ecto.Migration def change do - execute("create type realtime.wal_column as ( - name text, - type text, - value jsonb, - is_pkey boolean, - is_selectable boolean - );") + execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN + CREATE TYPE realtime.wal_column AS ( + name text, + type text, + value jsonb, + is_pkey boolean, + is_selectable boolean + ); + END IF; + END$$; + """) execute("create function realtime.build_prepared_statement_sql( prepared_statement_name text, diff --git a/lib/realtime/tenants/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex b/lib/realtime/tenants/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex index c34c67232..8b29d96f8 100644 --- a/lib/realtime/tenants/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex +++ b/lib/realtime/tenants/repo/migrations/20211116214523_create_realtime_apply_rls_function.ex @@ -4,16 +4,31 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeApplyRlsFunction do use Ecto.Migration def change do - execute( - "create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');" - ) - - execute("create type realtime.wal_rls as ( - wal jsonb, - is_rls_enabled boolean, - users uuid[], - errors text[] - );") + execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'action') THEN + CREATE TYPE realtime.action AS ENUM ( + 'INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR' + ); + END IF; + END$$; + """) + + execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_rls') THEN + CREATE TYPE realtime.wal_rls AS ( + wal jsonb, + is_rls_enabled boolean, + users uuid[], + errors text[] + ); + END IF; + END$$; + """) + execute("create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) returns realtime.wal_rls language plpgsql diff --git a/lib/realtime/tenants/repo/migrations/20220603231003_add_quoted_regtypes_support.ex b/lib/realtime/tenants/repo/migrations/20220603231003_add_quoted_regtypes_support.ex index ef8f244bc..68111b5d8 100644 --- a/lib/realtime/tenants/repo/migrations/20220603231003_add_quoted_regtypes_support.ex +++ b/lib/realtime/tenants/repo/migrations/20220603231003_add_quoted_regtypes_support.ex @@ -6,16 +6,21 @@ defmodule Realtime.Tenants.Migrations.AddQuotedRegtypesSupport do def change do execute("drop type if exists realtime.wal_column cascade;") - execute(" - create type realtime.wal_column as ( - name text, - type_name text, - type_oid oid, - value jsonb, - is_pkey boolean, - is_selectable boolean - ); - ") + execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN + CREATE TYPE realtime.wal_column AS ( + name text, + type_name text, + type_oid oid, + value jsonb, + is_pkey boolean, + is_selectable boolean + ); + END IF; + END$$; + """) execute(" create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index a93f0bba2..abeb11fbb 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -55,7 +55,7 @@ defmodule RealtimeWeb.RealtimeChannel do :ok <- limit_max_users(socket.assigns), {:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket), {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), - {:ok, socket} <- assign_policies(sub_topic, db_conn, access_token, claims, socket) do + {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, access_token, claims, socket) do public? = !socket.assigns.check_authorization? is_new_api = is_new_api(params) tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, public?) @@ -249,9 +249,11 @@ defmodule RealtimeWeb.RealtimeChannel do pg_change_params: pg_change_params })} + {:error, error} when is_binary(error) -> + shutdown_response(socket, error) + {:error, error} -> message = "Access token has expired: " <> Helpers.to_log(error) - shutdown_response(socket, message) end end @@ -301,8 +303,10 @@ defmodule RealtimeWeb.RealtimeChannel do %{"access_token" => refresh_token}, %{ assigns: %{ - access_token: _access_token, + access_token: access_token, pg_sub_ref: pg_sub_ref, + db_conn: db_conn, + channel_name: channel_name, pg_change_params: pg_change_params } } = socket @@ -310,24 +314,30 @@ defmodule RealtimeWeb.RealtimeChannel do when is_binary(refresh_token) do socket = assign(socket, :access_token, refresh_token) - case confirm_token(socket, true) do - {:ok, claims, confirm_token_ref, _, socket} -> - Helpers.cancel_timer(pg_sub_ref) - pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims)) + with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket), + {:ok, socket} <- + maybe_assign_policies(channel_name, db_conn, access_token, claims, socket) do + Helpers.cancel_timer(pg_sub_ref) + pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims)) - pg_sub_ref = - case pg_change_params do - [_ | _] -> postgres_subscribe() - _ -> nil - end + pg_sub_ref = + case pg_change_params do + [_ | _] -> postgres_subscribe() + _ -> nil + end + + assigns = %{ + pg_sub_ref: pg_sub_ref, + confirm_token_ref: confirm_token_ref, + pg_change_params: pg_change_params + } - assigns = %{ - pg_sub_ref: pg_sub_ref, - confirm_token_ref: confirm_token_ref, - pg_change_params: pg_change_params - } + {:noreply, assign(socket, assigns)} + else + {:error, error} when is_binary(error) -> + message = "Received an invalid access token from client: " <> error - {:noreply, assign(socket, assigns)} + shutdown_response(socket, message) {:error, error} -> message = "Received an invalid access token from client: " <> inspect(error) @@ -472,7 +482,7 @@ defmodule RealtimeWeb.RealtimeChannel do assign(socket, :access_token, tenant_token) end - defp confirm_token(%{assigns: assigns} = socket, check_policy \\ false) do + defp confirm_token(%{assigns: assigns} = socket) do %{ jwt_secret: jwt_secret, access_token: access_token @@ -483,11 +493,12 @@ defmodule RealtimeWeb.RealtimeChannel do with jwt_secret_dec <- Crypto.decrypt!(jwt_secret), {:ok, %{"exp" => exp} = claims} when is_integer(exp) <- ChannelsAuthorization.authorize_conn(access_token, jwt_secret_dec, jwt_jwks), - exp_diff when exp_diff > 0 <- exp - Joken.current_time(), - {:ok, socket} <- validate_policy(socket, claims, check_policy) do + exp_diff when exp_diff > 0 <- exp - Joken.current_time() do if ref = assigns[:confirm_token_ref], do: Helpers.cancel_timer(ref) + interval = min(@confirm_token_ms_interval, exp_diff * 1_000) ref = Process.send_after(self(), :confirm_token, interval) + {:ok, claims, ref, access_token, socket} else {:error, e} -> {:error, e} @@ -495,26 +506,6 @@ defmodule RealtimeWeb.RealtimeChannel do end end - defp validate_policy(%{assigns: %{check_authorization?: true}} = socket, _claims, _check_policy) do - {:ok, socket} - end - - defp validate_policy(socket, _claims, false) do - {:ok, socket} - end - - defp validate_policy(%{assigns: assigns} = socket, claims, true) do - %{ - access_token: access_token, - db_conn: db_conn, - channel_name: channel_name - } = assigns - - with {:ok, socket} <- assign_policies(channel_name, db_conn, access_token, claims, socket) do - {:ok, socket} - end - end - defp shutdown_response(%{assigns: %{channel_name: channel_name}} = socket, message) when is_binary(message) do Helpers.log_error("ChannelShutdown", message) @@ -642,7 +633,7 @@ defmodule RealtimeWeb.RealtimeChannel do end) end - defp assign_policies( + defp maybe_assign_policies( topic, db_conn, access_token, @@ -676,7 +667,7 @@ defmodule RealtimeWeb.RealtimeChannel do end end - defp assign_policies(_, _, _, _, socket) do + defp maybe_assign_policies(_, _, _, _, socket) do {:ok, assign(socket, policies: nil)} end end diff --git a/lib/realtime_web/channels/realtime_channel/presence_handler.ex b/lib/realtime_web/channels/realtime_channel/presence_handler.ex index badb28d60..cdd1a71e2 100644 --- a/lib/realtime_web/channels/realtime_channel/presence_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/presence_handler.ex @@ -35,7 +35,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do %{tenant_topic: topic, policies: policies} = assigns case policies do - %Policies{presence: %PresencePolicies{write: false}} -> + %Policies{presence: %PresencePolicies{read: false}} -> Logger.info("Presence track message ignored on #{topic}") {:noreply, socket} diff --git a/mix.exs b/mix.exs index bf60890ca..61af66e95 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.29.14", + version: "2.29.15", elixir: "~> 1.16.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 08696598b..d848fd8b4 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -76,7 +76,7 @@ defmodule Realtime.Integration.RtChannelTest do :ok end - test "postgres" do + test "handle postgres extension" do {socket, _} = get_connection() topic = "realtime:any" config = %{postgres_changes: [%{event: "*", schema: "public"}]} @@ -190,7 +190,7 @@ defmodule Realtime.Integration.RtChannelTest do 2000 end - describe "broadcast feature" do + describe "handle broadcast extension" do setup [:rls_context] test "public broadcast" do @@ -359,7 +359,7 @@ defmodule Realtime.Integration.RtChannelTest do end end - describe "presence feature" do + describe "handle presence extension" do setup [:rls_context] test "public presence" do @@ -473,7 +473,7 @@ defmodule Realtime.Integration.RtChannelTest do join_ref: nil } - refute_receive %Message{event: "presence_state", payload: _, ref: nil, topic: ^topic} + assert_receive %Message{event: "presence_state", payload: %{}, ref: nil, topic: ^topic} refute_receive %Message{event: "presence_diff", payload: _, ref: _, topic: ^topic} payload = %{ @@ -522,7 +522,7 @@ defmodule Realtime.Integration.RtChannelTest do WebsocketClient.connect(self(), @uri, @serializer, [{"x-api-key", token}]) end - describe "refresh token" do + describe "handle refresh token messages" do setup [:rls_context] @tag policies: [ @@ -534,26 +534,57 @@ defmodule Realtime.Integration.RtChannelTest do {socket, access_token} = get_connection("authenticated") {:ok, new_token} = token_valid("anon") - topic = "realtime:#{topic}" + realtime_topic = "realtime:#{topic}" - WebsocketClient.join(socket, topic, %{ + WebsocketClient.join(socket, realtime_topic, %{ config: %{broadcast: %{self: true}, private: true}, - access_token: new_token + access_token: access_token }) - WebsocketClient.send_event(socket, topic, "access_token", %{"access_token" => access_token}) + assert_receive %Phoenix.Socket.Message{event: "phx_reply"} + assert_receive %Phoenix.Socket.Message{event: "presence_state"} + :timer.sleep(2000) + + WebsocketClient.send_event(socket, realtime_topic, "access_token", %{ + "access_token" => new_token + }) assert_receive %Phoenix.Socket.Message{ - event: "phx_reply", - payload: %{ - "response" => %{ - "reason" => "You do not have permissions to read from this Topic" - }, - "status" => "error" - }, - topic: ^topic - }, - 500 + event: "system", + payload: %{ + "channel" => ^topic, + "extension" => "system", + "message" => + "Received an invalid access token from client: You do not have permissions to read from this Topic", + "status" => "error" + }, + topic: ^realtime_topic + } + + assert_receive %Phoenix.Socket.Message{event: "phx_close", topic: ^realtime_topic} + end + + test "on new access_token and channel is public policies are not reevaluated", + %{topic: topic} do + {socket, access_token} = get_connection("authenticated") + {:ok, new_token} = token_valid("anon") + + realtime_topic = "realtime:#{topic}" + + WebsocketClient.join(socket, realtime_topic, %{ + config: %{broadcast: %{self: true}, private: false}, + access_token: access_token + }) + + assert_receive %Phoenix.Socket.Message{event: "phx_reply"} + assert_receive %Phoenix.Socket.Message{event: "presence_state"} + :timer.sleep(1000) + + WebsocketClient.send_event(socket, realtime_topic, "access_token", %{ + "access_token" => new_token + }) + + refute_receive %Phoenix.Socket.Message{} end end