Skip to content

Commit

Permalink
Merge pull request #99 from supabase/ewalrus_sync
Browse files Browse the repository at this point in the history
Deploy staging
  • Loading branch information
abc3 authored Mar 23, 2022
2 parents 3256a7f + d0b70e5 commit 0f1e6f6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 34 deletions.
1 change: 1 addition & 0 deletions lib/ewalrus/db_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Ewalrus.DbSupervisor do
replication_poll_interval: args[:poll_interval],
publication: args[:publication],
slot_name: args[:slot_name],
max_changes: 100,
max_record_bytes: 1_048_576
]

Expand Down
4 changes: 3 additions & 1 deletion lib/ewalrus/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule Ewalrus.ReplicationPoller do
publication: Keyword.fetch!(opts, :publication),
slot_name: Keyword.fetch!(opts, :slot_name),
max_record_bytes: Keyword.fetch!(opts, :max_record_bytes),
max_changes: Keyword.fetch!(opts, :max_changes),
conn: Keyword.fetch!(opts, :conn),
id: Keyword.fetch!(opts, :id)
}
Expand Down Expand Up @@ -75,14 +76,15 @@ defmodule Ewalrus.ReplicationPoller do
publication: publication,
slot_name: slot_name,
max_record_bytes: max_record_bytes,
max_changes: max_changes,
conn: conn,
id: id
} = state
) do
Process.cancel_timer(poll_ref)

try do
Replications.list_changes(conn, slot_name, publication, max_record_bytes)
Replications.list_changes(conn, slot_name, publication, max_changes, max_record_bytes)
catch
:error, reason ->
{:error, reason}
Expand Down
65 changes: 33 additions & 32 deletions lib/ewalrus/replications.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Ewalrus.Replications do
end)
end

def list_changes(conn, slot_name, publication, max_record_bytes) do
def list_changes(conn, slot_name, publication, max_changes, max_record_bytes) do
query(
conn,
"with pub as (
Expand All @@ -36,56 +36,57 @@ defmodule Ewalrus.Replications do
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
string_agg(realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), ',') w2j_add_tables
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null),
''
) w2j_add_tables
from
pg_publication pp
join pg_publication_tables ppt
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = $1
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_logical_slot_get_changes(
$2, null, $3,
'include-pk', '1',
'include-transaction', 'false',
'include-timestamp', 'true',
'write-in-chunks', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
pub,
lateral (
select
*
from
pg_logical_slot_get_changes(
$2, null, null,
'include-pk', '1',
'include-transaction', 'false',
'include-timestamp', 'true',
'write-in-chunks', 'true',
'format-version', '2',
'actions', coalesce(pub.w2j_actions, ''),
'add-tables', pub.w2j_add_tables
)
) w2j,
lateral (
select
x.wal,
x.is_rls_enabled,
x.subscription_ids,
x.errors
from
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := $3
) x(wal, is_rls_enabled, subscription_ids, errors)
) xyz
where coalesce(pub.w2j_add_tables, '') <> ''
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := $4
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null",
[
publication,
slot_name,
max_changes,
max_record_bytes
]
)
Expand Down
2 changes: 1 addition & 1 deletion lib/multiplayer_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule MultiplayerWeb.RealtimeChannel do
{:event, %{type: type} = event},
%{assigns: %{topic: topic}} = socket
) do
Logger.debug("Got event, #{inspect(event, pretty: true)}")
# Logger.debug("Got event, #{inspect(event, pretty: true)}")
update_topic(socket, topic) |> push(type, event)
{:noreply, socket}
end
Expand Down

0 comments on commit 0f1e6f6

Please sign in to comment.