diff --git a/lib/ewalrus/db_supervisor.ex b/lib/ewalrus/db_supervisor.ex index 42a38e4c4..5b5223138 100644 --- a/lib/ewalrus/db_supervisor.ex +++ b/lib/ewalrus/db_supervisor.ex @@ -28,6 +28,7 @@ defmodule Ewalrus.DbSupervisor do replication_poll_interval: args[:poll_interval], publication: args[:publication], slot_name: args[:slot_name], + max_changes: 100, max_record_bytes: 1_048_576 ] diff --git a/lib/ewalrus/replication_poller.ex b/lib/ewalrus/replication_poller.ex index 5454d3971..f1461ec96 100644 --- a/lib/ewalrus/replication_poller.ex +++ b/lib/ewalrus/replication_poller.ex @@ -30,6 +30,7 @@ defmodule Ewalrus.ReplicationPoller do publication: Keyword.fetch!(opts, :publication), slot_name: Keyword.fetch!(opts, :slot_name), max_record_bytes: Keyword.fetch!(opts, :max_record_bytes), + max_changes: Keyword.fetch!(opts, :max_changes), conn: Keyword.fetch!(opts, :conn), id: Keyword.fetch!(opts, :id) } @@ -75,6 +76,7 @@ defmodule Ewalrus.ReplicationPoller do publication: publication, slot_name: slot_name, max_record_bytes: max_record_bytes, + max_changes: max_changes, conn: conn, id: id } = state @@ -82,7 +84,7 @@ defmodule Ewalrus.ReplicationPoller do Process.cancel_timer(poll_ref) try do - Replications.list_changes(conn, slot_name, publication, max_record_bytes) + Replications.list_changes(conn, slot_name, publication, max_changes, max_record_bytes) catch :error, reason -> {:error, reason} diff --git a/lib/ewalrus/replications.ex b/lib/ewalrus/replications.ex index c2b131052..2e47ed315 100644 --- a/lib/ewalrus/replications.ex +++ b/lib/ewalrus/replications.ex @@ -25,7 +25,7 @@ defmodule Ewalrus.Replications do end) end - def list_changes(conn, slot_name, publication, max_record_bytes) do + def list_changes(conn, slot_name, publication, max_changes, max_record_bytes) do query( conn, "with pub as ( @@ -36,16 +36,38 @@ defmodule Ewalrus.Replications do case when bool_or(pubupdate) then 'update' else null end, case when bool_or(pubdelete) then 'delete' else null end ) as w2j_actions, - string_agg(realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), ',') w2j_add_tables + coalesce( + string_agg( + realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), + ',' + ) filter (where ppt.tablename is not null), + '' + ) w2j_add_tables from pg_publication pp - join pg_publication_tables ppt + left join pg_publication_tables ppt on pp.pubname = ppt.pubname where pp.pubname = $1 group by pp.pubname limit 1 + ), + w2j as ( + select + x.*, pub.w2j_add_tables + from + pub, + pg_logical_slot_get_changes( + $2, null, $3, + 'include-pk', '1', + 'include-transaction', 'false', + 'include-timestamp', 'true', + 'write-in-chunks', 'true', + 'format-version', '2', + 'actions', pub.w2j_actions, + 'add-tables', pub.w2j_add_tables + ) x ) select xyz.wal, @@ -53,39 +75,18 @@ defmodule Ewalrus.Replications do xyz.subscription_ids, xyz.errors from - pub, - lateral ( - select - * - from - pg_logical_slot_get_changes( - $2, null, null, - 'include-pk', '1', - 'include-transaction', 'false', - 'include-timestamp', 'true', - 'write-in-chunks', 'true', - 'format-version', '2', - 'actions', coalesce(pub.w2j_actions, ''), - 'add-tables', pub.w2j_add_tables - ) - ) w2j, - lateral ( - select - x.wal, - x.is_rls_enabled, - x.subscription_ids, - x.errors - from - realtime.apply_rls( - wal := w2j.data::jsonb, - max_record_bytes := $3 - ) x(wal, is_rls_enabled, subscription_ids, errors) - ) xyz - where coalesce(pub.w2j_add_tables, '') <> '' + w2j, + realtime.apply_rls( + wal := w2j.data::jsonb, + max_record_bytes := $4 + ) xyz(wal, is_rls_enabled, subscription_ids, errors) + where + w2j.w2j_add_tables <> '' and xyz.subscription_ids[1] is not null", [ publication, slot_name, + max_changes, max_record_bytes ] ) diff --git a/lib/multiplayer_web/channels/realtime_channel.ex b/lib/multiplayer_web/channels/realtime_channel.ex index ca3a96a58..96c6eaf5b 100644 --- a/lib/multiplayer_web/channels/realtime_channel.ex +++ b/lib/multiplayer_web/channels/realtime_channel.ex @@ -39,7 +39,7 @@ defmodule MultiplayerWeb.RealtimeChannel do {:event, %{type: type} = event}, %{assigns: %{topic: topic}} = socket ) do - Logger.debug("Got event, #{inspect(event, pretty: true)}") + # Logger.debug("Got event, #{inspect(event, pretty: true)}") update_topic(socket, topic) |> push(type, event) {:noreply, socket} end