From e47148365ac89201a02c95745074d756ebfdd264 Mon Sep 17 00:00:00 2001 From: Wen Bo Xie <5532241+w3b6x9@users.noreply.github.com> Date: Tue, 15 Dec 2020 23:57:08 -0500 Subject: [PATCH] fix: Set commit_timestamp value for every change record (#88) --- server/lib/realtime/replication.ex | 19 ++++++++++++++++++- .../lib/realtime/subscribers_notification.ex | 3 +-- server/test/realtime/replication_test.exs | 6 +++--- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/server/lib/realtime/replication.ex b/server/lib/realtime/replication.ex index 4c87ea648..e24a91be9 100644 --- a/server/lib/realtime/replication.ex +++ b/server/lib/realtime/replication.ex @@ -296,6 +296,23 @@ defmodule Realtime.Replication do end defp notify_subscribers(%State{transaction: {_current_txn_lsn, txn}}) do - SubscribersNotification.notify(txn) + txn + |> set_change_records_commit_timestamps() + |> SubscribersNotification.notify() + end + + defp set_change_records_commit_timestamps( + %Transaction{changes: changes, commit_timestamp: commit_timestamp} = txn + ) do + # Set every change record's (e.g. %Realtime.Adapters.Changes.NewRecord) commit_timestamp value + # to transaction's (i.e. %Realtime.Adapters.Changes.Transaction) commit_timestamp value + %Transaction{ + txn + | changes: + Enum.map( + changes, + fn record -> %{record | commit_timestamp: commit_timestamp} end + ) + } end end diff --git a/server/lib/realtime/subscribers_notification.ex b/server/lib/realtime/subscribers_notification.ex index 3f3cfdb29..e174d2f05 100644 --- a/server/lib/realtime/subscribers_notification.ex +++ b/server/lib/realtime/subscribers_notification.ex @@ -43,8 +43,7 @@ defmodule Realtime.SubscribersNotification do {:ok, realtime_config} = Realtime.ConfigurationManager.get_config(:realtime) - for raw_change <- txn.changes do - change = Map.put(raw_change, :commit_timestamp, txn.commit_timestamp) + for change <- txn.changes do topic = "realtime" schema_topic = "#{topic}:#{change.schema}" table_topic = "#{schema_topic}:#{change.table}" diff --git a/server/test/realtime/replication_test.exs b/server/test/realtime/replication_test.exs index 193777630..987beae2a 100644 --- a/server/test/realtime/replication_test.exs +++ b/server/test/realtime/replication_test.exs @@ -191,7 +191,7 @@ defmodule Realtime.ReplicationTest do end end - test "Realtime.DatabaseRetryMonitor.get_retry_delay/1 when conn_retry_delays is empty" do + test "Realtime.Replication.get_retry_delay/1 when conn_retry_delays is empty" do state = %State{ conn_retry_delays: [], config: [ @@ -209,7 +209,7 @@ defmodule Realtime.ReplicationTest do assert Enum.all?(delays, &(is_integer(&1) and &1 > 0)) end - test "Realtime.DatabaseRetryMonitor.get_retry_delay/1 when conn_retry_delays is not empty" do + test "Realtime.Replication.get_retry_delay/1 when conn_retry_delays is not empty" do state = %State{ conn_retry_delays: [489, 1011, 1996, 4023] } @@ -220,7 +220,7 @@ defmodule Realtime.ReplicationTest do assert delays == [1011, 1996, 4023] end - test "Realtime.DatabaseRetryMonitor.reset_retry_delay/1" do + test "Realtime.Replication.reset_retry_delay/1" do state = %State{ conn_retry_delays: [198, 403, 781] }