Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix improper policy checking #1085

Merged
merged 8 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeSubscriptionTable do
use Ecto.Migration

def change do
execute("create type realtime.equality_op as enum(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'equality_op') THEN
CREATE TYPE realtime.equality_op AS ENUM(
'eq', 'neq', 'lt', 'lte', 'gt', 'gte'
);
END IF;
END$$;
""")

execute("create type realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'user_defined_filter') THEN
CREATE TYPE realtime.user_defined_filter as (
column_name text,
op realtime.equality_op,
value text
);
END IF;
END$$;
""")

execute("create table realtime.subscription (
-- Tracks which users are subscribed to each table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeBuildPreparedStatementSqlFun
use Ecto.Migration

def change do
execute("create type realtime.wal_column as (
name text,
type text,
value jsonb,
is_pkey boolean,
is_selectable boolean
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN
CREATE TYPE realtime.wal_column AS (
name text,
type text,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
END IF;
END$$;
""")

execute("create function realtime.build_prepared_statement_sql(
prepared_statement_name text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeApplyRlsFunction do
use Ecto.Migration

def change do
execute(
"create type realtime.action as enum ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR');"
)

execute("create type realtime.wal_rls as (
wal jsonb,
is_rls_enabled boolean,
users uuid[],
errors text[]
);")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'action') THEN
CREATE TYPE realtime.action AS ENUM (
'INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR'
);
END IF;
END$$;
""")

execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_rls') THEN
CREATE TYPE realtime.wal_rls AS (
wal jsonb,
is_rls_enabled boolean,
users uuid[],
errors text[]
);
END IF;
END$$;
""")

execute("create function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)
returns realtime.wal_rls
language plpgsql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ defmodule Realtime.Tenants.Migrations.AddQuotedRegtypesSupport do
def change do
execute("drop type if exists realtime.wal_column cascade;")

execute("
create type realtime.wal_column as (
name text,
type_name text,
type_oid oid,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
")
execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'wal_column') THEN
CREATE TYPE realtime.wal_column AS (
name text,
type_name text,
type_oid oid,
value jsonb,
is_pkey boolean,
is_selectable boolean
);
END IF;
END$$;
""")

execute("
create or replace function realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])
Expand Down
56 changes: 27 additions & 29 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,11 @@ defmodule RealtimeWeb.RealtimeChannel do
pg_change_params: pg_change_params
})}

{:error, error} ->
message = "Access token has expired: " <> Helpers.to_log(error)
{:error, error} when is_binary(error) ->
shutdown_response(socket, error)

{:error, error} ->
message = Helpers.to_log(error)
shutdown_response(socket, message)
end
end
Expand Down Expand Up @@ -310,24 +312,27 @@ defmodule RealtimeWeb.RealtimeChannel do
when is_binary(refresh_token) do
socket = assign(socket, :access_token, refresh_token)

case confirm_token(socket, true) do
{:ok, claims, confirm_token_ref, _, socket} ->
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))
with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket),
{:ok, socket} <- validate_policy(socket, claims) do
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))

pg_sub_ref =
case pg_change_params do
[_ | _] -> postgres_subscribe()
_ -> nil
end
pg_sub_ref =
case pg_change_params do
[_ | _] -> postgres_subscribe()
_ -> nil
end

assigns = %{
pg_sub_ref: pg_sub_ref,
confirm_token_ref: confirm_token_ref,
pg_change_params: pg_change_params
}
assigns = %{
pg_sub_ref: pg_sub_ref,
confirm_token_ref: confirm_token_ref,
pg_change_params: pg_change_params
}

{:noreply, assign(socket, assigns)}
{:noreply, assign(socket, assigns)}
else
{:error, error} when is_binary(error) ->
shutdown_response(socket, error)

{:error, error} ->
message = "Received an invalid access token from client: " <> inspect(error)
Expand Down Expand Up @@ -472,7 +477,7 @@ defmodule RealtimeWeb.RealtimeChannel do
assign(socket, :access_token, tenant_token)
end

defp confirm_token(%{assigns: assigns} = socket, check_policy \\ false) do
defp confirm_token(%{assigns: assigns} = socket) do
%{
jwt_secret: jwt_secret,
access_token: access_token
Expand All @@ -483,27 +488,20 @@ defmodule RealtimeWeb.RealtimeChannel do
with jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, %{"exp" => exp} = claims} when is_integer(exp) <-
ChannelsAuthorization.authorize_conn(access_token, jwt_secret_dec, jwt_jwks),
exp_diff when exp_diff > 0 <- exp - Joken.current_time(),
{:ok, socket} <- validate_policy(socket, claims, check_policy) do
exp_diff when exp_diff > 0 <- exp - Joken.current_time() do
if ref = assigns[:confirm_token_ref], do: Helpers.cancel_timer(ref)

interval = min(@confirm_token_ms_interval, exp_diff * 1_000)
ref = Process.send_after(self(), :confirm_token, interval)

{:ok, claims, ref, access_token, socket}
else
{:error, e} -> {:error, e}
e -> {:error, e}
end
end

defp validate_policy(%{assigns: %{check_authorization?: true}} = socket, _claims, _check_policy) do
{:ok, socket}
end

defp validate_policy(socket, _claims, false) do
{:ok, socket}
end

defp validate_policy(%{assigns: assigns} = socket, claims, true) do
defp validate_policy(%{assigns: assigns} = socket, claims) do
%{
access_token: access_token,
db_conn: db_conn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
%{tenant_topic: topic, policies: policies} = assigns

case policies do
%Policies{presence: %PresencePolicies{write: false}} ->
%Policies{presence: %PresencePolicies{read: false}} ->
Logger.info("Presence track message ignored on #{topic}")
{:noreply, socket}

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.29.14",
version: "2.29.15",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
37 changes: 22 additions & 15 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ defmodule Realtime.Integration.RtChannelTest do
join_ref: nil
}

refute_receive %Message{event: "presence_state", payload: _, ref: nil, topic: ^topic}
assert_receive %Message{event: "presence_state", payload: %{}, ref: nil, topic: ^topic}
refute_receive %Message{event: "presence_diff", payload: _, ref: _, topic: ^topic}

payload = %{
Expand Down Expand Up @@ -534,26 +534,33 @@ defmodule Realtime.Integration.RtChannelTest do
{socket, access_token} = get_connection("authenticated")
{:ok, new_token} = token_valid("anon")

topic = "realtime:#{topic}"
realtime_topic = "realtime:#{topic}"

WebsocketClient.join(socket, topic, %{
WebsocketClient.join(socket, realtime_topic, %{
config: %{broadcast: %{self: true}, private: true},
access_token: new_token
access_token: access_token
})

WebsocketClient.send_event(socket, topic, "access_token", %{"access_token" => access_token})
assert_receive %Phoenix.Socket.Message{event: "phx_reply"}
assert_receive %Phoenix.Socket.Message{event: "presence_state"}
:timer.sleep(2000)

WebsocketClient.send_event(socket, realtime_topic, "access_token", %{
"access_token" => new_token
})

assert_receive %Phoenix.Socket.Message{
event: "phx_reply",
payload: %{
"response" => %{
"reason" => "You do not have permissions to read from this Topic"
},
"status" => "error"
},
topic: ^topic
},
500
event: "system",
payload: %{
"channel" => ^topic,
"extension" => "system",
"message" => "You do not have permissions to read from this Topic",
"status" => "error"
},
topic: ^realtime_topic
}

assert_receive %Phoenix.Socket.Message{event: "phx_close", topic: ^realtime_topic}
end
end

Expand Down
Loading