From 62e86e2171ee4452fd25567cd7d9499246f44a32 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Tue, 7 Mar 2023 23:48:33 +0100 Subject: [PATCH 1/6] Distinguish between an empty and a null (not existing) reciprocal changeset --- src/realm/sync/changeset.hpp | 1 - src/realm/sync/changeset_encoder.cpp | 23 ++--- src/realm/sync/noinst/client_history_impl.cpp | 2 + test/test_sync.cpp | 90 +++++++++++++++++++ 4 files changed, 101 insertions(+), 15 deletions(-) diff --git a/src/realm/sync/changeset.hpp b/src/realm/sync/changeset.hpp index 3e56dd9c348..e67fbb73494 100644 --- a/src/realm/sync/changeset.hpp +++ b/src/realm/sync/changeset.hpp @@ -61,7 +61,6 @@ struct Changeset { struct IteratorImpl; using iterator = IteratorImpl; using const_iterator = IteratorImpl; - using value_type = Instruction; iterator begin() noexcept; iterator end() noexcept; const_iterator begin() const noexcept; diff --git a/src/realm/sync/changeset_encoder.cpp b/src/realm/sync/changeset_encoder.cpp index 2f89739447a..ef2a4485e5c 100644 --- a/src/realm/sync/changeset_encoder.cpp +++ b/src/realm/sync/changeset_encoder.cpp @@ -414,19 +414,14 @@ void ChangesetEncoder::reset() noexcept void ChangesetEncoder::encode_single(const Changeset& log) { - // Checking if the log is empty avoids serialized interned strings in a - // changeset where all meaningful instructions have been discarded due to - // merge or compaction. - if (!log.empty()) { - add_string_range(log.string_data()); - const auto& strings = log.interned_strings(); - for (size_t i = 0; i < strings.size(); ++i) { - set_intern_string(uint32_t(i), strings[i]); // Throws - } - for (auto instr : log) { - if (!instr) - continue; - (*this)(*instr); // Throws - } + add_string_range(log.string_data()); + const auto& strings = log.interned_strings(); + for (size_t i = 0; i < strings.size(); ++i) { + set_intern_string(uint32_t(i), strings[i]); // Throws + } + for (auto instr : log) { + if (!instr) + continue; + (*this)(*instr); // Throws } } diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 725d5fe76c6..e56ec2dd425 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -764,6 +764,7 @@ Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset, void ClientHistory::add_sync_history_entry(const HistoryEntry& entry) { + REALM_ASSERT(m_arrays->changesets.size() == sync_history_size()); REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size()); REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size()); REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size()); @@ -1175,6 +1176,7 @@ void ClientHistory::update_from_ref_and_version(ref_type ref, version_type versi m_ct_history_base_version = version - ct_history_size(); m_sync_history_base_version = version - sync_history_size(); + REALM_ASSERT(m_arrays->changesets.size() == sync_history_size()); REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size()); REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size()); REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size()); diff --git a/test/test_sync.cpp b/test/test_sync.cpp index c86bb576f40..a9c69138090 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -6680,4 +6680,94 @@ TEST(Sync_InvalidChangesetFromServer) StringData(e.what()).contains("Failed to parse received changeset: Invalid interned string")); } +// Tests that an empty reciprocal changesets is set and retrieved correctly. +TEST(Sync_SetAndGetEmptyReciprocalChangeset) +{ + using namespace realm; + using namespace realm::sync::instr; + using realm::sync::Changeset; + + TEST_CLIENT_DB(db); + + auto& history = get_history(db); + history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false); + timestamp_type timestamp{1}; + history.set_local_origin_timestamp_source([&] { + return ++timestamp; + }); + + auto latest_local_verson = [&] { + auto tr = db->start_write(); + // Create schema: single table with array of ints as property. + tr->add_table_with_primary_key("class_table", type_Int, "_id")->add_column_list(type_Int, "ints"); + tr->commit_and_continue_writing(); + + // Create object and initialize array. + TableRef table = tr->get_table("class_table"); + auto obj = table->create_object_with_primary_key(42); + auto ints = obj.get_list("ints"); + for (auto i = 0; i < 8; ++i) { + ints.insert(i, i); + } + tr->commit_and_continue_writing(); + + // Move element in array. + ints.move(7, 2); + return tr->commit(); + }(); + + // Create changeset which moves element from index 7 to index 0 in array. + // This changeset will discard the previous move (reciprocal changeset), leaving the local reciprocal changesets + // with no instructions (empty). + Changeset changeset; + ArrayMove instr; + instr.table = changeset.intern_string("table"); + instr.object = instr::PrimaryKey{42}; + instr.field = changeset.intern_string("ints"); + instr.path.push_back(7); + instr.ndx_2 = 0; + instr.prior_size = 8; + changeset.push_back(instr); + changeset.version = 1; + changeset.last_integrated_remote_version = latest_local_verson - 1; + changeset.origin_timestamp = timestamp; + changeset.origin_file_ident = 2; + + ChangesetEncoder::Buffer encoded; + std::vector server_changesets_encoded; + encode_changeset(changeset, encoded); + server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version, + BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp, + changeset.origin_file_ident); + + SyncProgress progress = {}; + progress.download.server_version = changeset.version; + progress.download.last_integrated_client_version = latest_local_verson - 1; + progress.latest_server_version.version = changeset.version; + progress.latest_server_version.salt = 0x7876543217654321; + + uint_fast64_t downloadable_bytes = 0; + VersionInfo version_info; + util::StderrLogger logger; + auto transact = db->start_read(); + history.integrate_server_changesets(progress, &downloadable_bytes, server_changesets_encoded, version_info, + DownloadBatchState::SteadyState, logger, transact); + + bool is_compressed = false; + auto data = history.get_reciprocal_transform(latest_local_verson, is_compressed); + Changeset reciprocal_changeset; + ChunkedBinaryInputStream in{data}; + if (is_compressed) { + size_t total_size; + auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size); + REALM_ASSERT(decompressed); + sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws + } + else { + sync::parse_changeset(in, reciprocal_changeset); // Throws + } + // The only instruction in the reciprocal changeset was discarded during OT. + CHECK(reciprocal_changeset.empty()); +} + } // unnamed namespace From d297f7944a8ee73fcce7c2ffa58ade3d19cbf607 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 8 Mar 2023 07:59:13 +0100 Subject: [PATCH 2/6] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d205d084d9..e182cd40edb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT ([#6191](https://github.com/realm/realm-core/issues/6191), since v10.2.0) ### Breaking changes * You can no longer associate a Logger Factory with the SyncManager. Instead you can install one default logger via Logger::set_default_logger(). This logger will then be used all over Core. Logging cmake flags updated to use REALM_TEST_LOGGING and REALM_TEST_LOGGING_LEVEL From 21e27d0019184f96bc0caa94b8b239341541b75c Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 13 Mar 2023 17:21:26 +0100 Subject: [PATCH 3/6] Changes after code reiview --- test/test_sync.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index a9c69138090..d27a935b307 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -6760,7 +6760,7 @@ TEST(Sync_SetAndGetEmptyReciprocalChangeset) if (is_compressed) { size_t total_size; auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size); - REALM_ASSERT(decompressed); + CHECK(decompressed); sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws } else { From f1fcc3e6a68a6fb6deff77b573b05193fbc63358 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Mon, 13 Mar 2023 23:04:53 +0100 Subject: [PATCH 4/6] More changes after code review --- CHANGELOG.md | 2 +- src/realm/sync/changeset_encoder.cpp | 23 +++++++++++-------- src/realm/sync/noinst/client_history_impl.cpp | 7 +++++- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e182cd40edb..f4da8bfb7bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT ([#6191](https://github.com/realm/realm-core/issues/6191), since v10.2.0) +* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT ([#6191](https://github.com/realm/realm-core/issues/6191), since v11.13.0) ### Breaking changes * You can no longer associate a Logger Factory with the SyncManager. Instead you can install one default logger via Logger::set_default_logger(). This logger will then be used all over Core. Logging cmake flags updated to use REALM_TEST_LOGGING and REALM_TEST_LOGGING_LEVEL diff --git a/src/realm/sync/changeset_encoder.cpp b/src/realm/sync/changeset_encoder.cpp index ef2a4485e5c..2f89739447a 100644 --- a/src/realm/sync/changeset_encoder.cpp +++ b/src/realm/sync/changeset_encoder.cpp @@ -414,14 +414,19 @@ void ChangesetEncoder::reset() noexcept void ChangesetEncoder::encode_single(const Changeset& log) { - add_string_range(log.string_data()); - const auto& strings = log.interned_strings(); - for (size_t i = 0; i < strings.size(); ++i) { - set_intern_string(uint32_t(i), strings[i]); // Throws - } - for (auto instr : log) { - if (!instr) - continue; - (*this)(*instr); // Throws + // Checking if the log is empty avoids serialized interned strings in a + // changeset where all meaningful instructions have been discarded due to + // merge or compaction. + if (!log.empty()) { + add_string_range(log.string_data()); + const auto& strings = log.interned_strings(); + for (size_t i = 0; i < strings.size(); ++i) { + set_intern_string(uint32_t(i), strings[i]); // Throws + } + for (auto instr : log) { + if (!instr) + continue; + (*this)(*instr); // Throws + } } } diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index e56ec2dd425..f696497449f 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -627,6 +627,11 @@ void ClientHistory::set_reciprocal_transform(version_type version, BinaryData da std::size_t index = size_t(version - m_sync_history_base_version) - 1; REALM_ASSERT(index < sync_history_size()); + if (data.is_null()) { + m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws + return; + } + auto compressed = util::compression::allocate_and_compress_nonportable(data); m_arrays->reciprocal_transforms.set(index, BinaryData{compressed.data(), compressed.size()}); // Throws } @@ -776,7 +781,7 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry) m_arrays->changesets.add(BinaryData{compressed.data(), compressed.size()}); // Throws } else { - m_arrays->changesets.add(BinaryData()); // Throws + m_arrays->changesets.add(BinaryData("", 0)); // Throws } m_arrays->reciprocal_transforms.add(BinaryData{}); // Throws From 8fa8340b3411cebd64d2d1c4492ae62941e4e843 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 15 Mar 2023 13:53:37 +0100 Subject: [PATCH 5/6] Better changelog message --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f4da8bfb7bc..28dc3694367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT ([#6191](https://github.com/realm/realm-core/issues/6191), since v11.13.0) +* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT. This may have resulted in arrays being in different orders on different devices. ([#6191](https://github.com/realm/realm-core/issues/6191), since v11.13.0) ### Breaking changes * You can no longer associate a Logger Factory with the SyncManager. Instead you can install one default logger via Logger::set_default_logger(). This logger will then be used all over Core. Logging cmake flags updated to use REALM_TEST_LOGGING and REALM_TEST_LOGGING_LEVEL From 7a288ad91182267a17176c2016af445cfe4a680a Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru Date: Wed, 15 Mar 2023 13:53:51 +0100 Subject: [PATCH 6/6] Integration test --- test/test_sync.cpp | 81 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index d27a935b307..a17cfbf36fa 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -6770,4 +6770,85 @@ TEST(Sync_SetAndGetEmptyReciprocalChangeset) CHECK(reciprocal_changeset.empty()); } +TEST(Sync_TransformAgainstEmptyReciprocalChangeset) +{ + TEST_DIR(dir); + TEST_CLIENT_DB(seed_db); + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + { + auto tr = seed_db->start_write(); + // Create schema: single table with array of ints as property. + auto table = tr->add_table_with_primary_key("class_table", type_Int, "_id"); + table->add_column_list(type_Int, "ints"); + table->add_column(type_String, "string"); + tr->commit_and_continue_writing(); + + // Create object and initialize array. + table = tr->get_table("class_table"); + auto obj = table->create_object_with_primary_key(42); + auto ints = obj.get_list("ints"); + for (auto i = 0; i < 8; ++i) { + ints.insert(i, i); + } + tr->commit(); + } + + MultiClientServerFixture fixture(3, 1, dir, test_context); + fixture.start(); + + util::Optional seed_session = fixture.make_bound_session(0, seed_db, 0, "/test"); + util::Optional db_1_session = fixture.make_bound_session(1, db_1, 0, "/test"); + util::Optional db_2_session = fixture.make_bound_session(2, db_2, 0, "/test"); + + seed_session->wait_for_upload_complete_or_client_stopped(); + db_1_session->wait_for_download_complete_or_client_stopped(); + db_2_session->wait_for_download_complete_or_client_stopped(); + seed_session.reset(); + db_2_session.reset(); + + auto move_element = [&](const DBRef& db, size_t from, size_t to, size_t string_size = 0) { + auto wt = db->start_write(); + auto table = wt->get_table("class_table"); + auto obj = table->get_object_with_primary_key(42); + auto ints = obj.get_list("ints"); + ints.move(from, to); + obj.set("string", std::string(string_size, 'a')); + wt->commit(); + }; + + // Client 1 uploads two move instructions. + move_element(db_1, 7, 2); + move_element(db_1, 7, 6); + + db_1_session->wait_for_upload_complete_or_client_stopped(); + + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + + // Client 2 uploads two move instructions. + // The sync client uploads at most 128 KB of data so we make the first changeset large enough so two upload + // messages are sent to the server instead of one. Each change is transformed against the changes from Client 1. + + // First change discards the first change (move(7, 2)) of Client 1. + move_element(db_2, 7, 0, 200 * 1024); + // Second change is tranformed against an empty reciprocal changeset as result of the change above. + move_element(db_2, 7, 5); + db_2_session = fixture.make_bound_session(2, db_2, 0, "/test"); + + db_1_session->wait_for_upload_complete_or_client_stopped(); + db_2_session->wait_for_upload_complete_or_client_stopped(); + + db_1_session->wait_for_download_complete_or_client_stopped(); + db_2_session->wait_for_download_complete_or_client_stopped(); + + ReadTransaction rt_1(db_1); + ReadTransaction rt_2(db_2); + const Group& group_1 = rt_1; + const Group& group_2 = rt_2; + group_1.verify(); + group_2.verify(); + CHECK(compare_groups(rt_1, rt_2)); +} + } // unnamed namespace