Skip to content

Commit

Permalink
fix: Set commit_timestamp value for every change record (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 authored Dec 16, 2020
1 parent 271bbfc commit e471483
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
19 changes: 18 additions & 1 deletion server/lib/realtime/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,23 @@ defmodule Realtime.Replication do
end

defp notify_subscribers(%State{transaction: {_current_txn_lsn, txn}}) do
SubscribersNotification.notify(txn)
txn
|> set_change_records_commit_timestamps()
|> SubscribersNotification.notify()
end

defp set_change_records_commit_timestamps(
%Transaction{changes: changes, commit_timestamp: commit_timestamp} = txn
) do
# Set every change record's (e.g. %Realtime.Adapters.Changes.NewRecord) commit_timestamp value
# to transaction's (i.e. %Realtime.Adapters.Changes.Transaction) commit_timestamp value
%Transaction{
txn
| changes:
Enum.map(
changes,
fn record -> %{record | commit_timestamp: commit_timestamp} end
)
}
end
end
3 changes: 1 addition & 2 deletions server/lib/realtime/subscribers_notification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ defmodule Realtime.SubscribersNotification do

{:ok, realtime_config} = Realtime.ConfigurationManager.get_config(:realtime)

for raw_change <- txn.changes do
change = Map.put(raw_change, :commit_timestamp, txn.commit_timestamp)
for change <- txn.changes do
topic = "realtime"
schema_topic = "#{topic}:#{change.schema}"
table_topic = "#{schema_topic}:#{change.table}"
Expand Down
6 changes: 3 additions & 3 deletions server/test/realtime/replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ defmodule Realtime.ReplicationTest do
end
end

test "Realtime.DatabaseRetryMonitor.get_retry_delay/1 when conn_retry_delays is empty" do
test "Realtime.Replication.get_retry_delay/1 when conn_retry_delays is empty" do
state = %State{
conn_retry_delays: [],
config: [
Expand All @@ -209,7 +209,7 @@ defmodule Realtime.ReplicationTest do
assert Enum.all?(delays, &(is_integer(&1) and &1 > 0))
end

test "Realtime.DatabaseRetryMonitor.get_retry_delay/1 when conn_retry_delays is not empty" do
test "Realtime.Replication.get_retry_delay/1 when conn_retry_delays is not empty" do
state = %State{
conn_retry_delays: [489, 1011, 1996, 4023]
}
Expand All @@ -220,7 +220,7 @@ defmodule Realtime.ReplicationTest do
assert delays == [1011, 1996, 4023]
end

test "Realtime.DatabaseRetryMonitor.reset_retry_delay/1" do
test "Realtime.Replication.reset_retry_delay/1" do
state = %State{
conn_retry_delays: [198, 403, 781]
}
Expand Down

0 comments on commit e471483

Please sign in to comment.