From 13d826453d27ad8bec3503a825d12fbf404d16f8 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru <96778637+danieltabacaru@users.noreply.github.com> Date: Thu, 8 Aug 2024 21:52:32 +0300 Subject: [PATCH] RCORE-1977 Empty reciprocal changesets lead to crashes or data divergence (#7955) * Fix bugs related to storing and retrieving reciprocal changesets --- CHANGELOG.md | 2 + src/realm/sync/noinst/client_history_impl.cpp | 2 +- src/realm/sync/transform.cpp | 7 +- test/test_sync.cpp | 387 +++++++++++++++++- 4 files changed, 378 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c28abe11e97..282a3c24cc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) * Fixed an "invalid column key" exception when using a RQL "BETWEEN" query on an int or timestamp property across links. ([#7935](https://github.com/realm/realm-core/issues/7935), since v14.10.1) +* Fixed conflict resolution bug related to ArrayErase and Clear instructions, which could sometimes cause an "Invalid prior_size" exception to prevent synchronization ([#7893](https://github.com/realm/realm-core/issues/7893), since v14.8.0). +* Fixed bug which would prevent eventual consistency during conflict resolution. Affected clients would experience data divergence and potentially consistency errors as a result. ([PR #7955](https://github.com/realm/realm-core/pull/7955), since v14.8.0) ### Breaking changes * None. diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 022e381fa04..3d00f64353e 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -714,7 +714,7 @@ void ClientHistory::set_reciprocal_transform(version_type version, BinaryData da std::size_t index = size_t(version - m_sync_history_base_version) - 1; REALM_ASSERT(index < sync_history_size()); - if (data.is_null()) { + if (data.size() == 0) { m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws return; } diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index f0fef18f043..2688e417571 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -2590,8 +2590,9 @@ size_t Transformer::transform_remote_changesets(TransformHistory& history, file_ Changeset& Transformer::get_reciprocal_transform(TransformHistory& history, file_ident_type local_file_ident, version_type version, const HistoryEntry& history_entry) { - auto& changeset = m_reciprocal_transform_cache[version]; // Throws - if (changeset.empty()) { + auto [it, success] = m_reciprocal_transform_cache.insert({version, Changeset{}}); // Throws + if (success) { + Changeset& changeset = it->second; bool is_compressed = false; ChunkedBinaryData data = history.get_reciprocal_transform(version, is_compressed); ChunkedBinaryInputStream in{data}; @@ -2613,7 +2614,7 @@ Changeset& Transformer::get_reciprocal_transform(TransformHistory& history, file origin_file_ident = local_file_ident; changeset.origin_file_ident = origin_file_ident; } - return changeset; + return it->second; } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 92933661667..933e3516f50 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -126,6 +126,23 @@ ClientHistory& get_history(DBRef db) return get_replication(db).get_history(); } +Changeset get_reciprocal_changeset(ClientHistory& hist, version_type version) +{ + bool is_compressed = false; + auto data = hist.get_reciprocal_transform(version, is_compressed); + Changeset reciprocal_changeset; + ChunkedBinaryInputStream in{data}; + if (is_compressed) { + size_t total_size; + auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size); + sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws + } + else { + sync::parse_changeset(in, reciprocal_changeset); // Throws + } + return reciprocal_changeset; +} + #if !REALM_MOBILE // the server is not implemented on devices TEST(Sync_BadVirtualPath) { @@ -6890,8 +6907,8 @@ TEST(Sync_SetAndGetEmptyReciprocalChangeset) }(); // Create changeset which moves element from index 7 to index 0 in array. - // This changeset will discard the previous move (reciprocal changeset), leaving the local reciprocal changesets - // with no instructions (empty). + // This changeset will discard the previous move (reciprocal changeset), + // leaving the local reciprocal changeset with no instructions (empty). Changeset changeset; ArrayMove instr; instr.table = changeset.intern_string("table"); @@ -6915,7 +6932,7 @@ TEST(Sync_SetAndGetEmptyReciprocalChangeset) SyncProgress progress = {}; progress.download.server_version = changeset.version; - progress.download.last_integrated_client_version = latest_local_version - 1; + progress.download.last_integrated_client_version = changeset.last_integrated_remote_version; progress.latest_server_version.version = changeset.version; progress.latest_server_version.salt = 0x7876543217654321; @@ -6925,23 +6942,361 @@ TEST(Sync_SetAndGetEmptyReciprocalChangeset) history.integrate_server_changesets(progress, downloadable_bytes, server_changesets_encoded, version_info, DownloadBatchState::SteadyState, *test_context.logger, transact); - bool is_compressed = false; - auto data = history.get_reciprocal_transform(latest_local_version, is_compressed); - Changeset reciprocal_changeset; - ChunkedBinaryInputStream in{data}; - if (is_compressed) { - size_t total_size; - auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size); - CHECK(decompressed); - sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws - } - else { - sync::parse_changeset(in, reciprocal_changeset); // Throws - } + auto reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version); // The only instruction in the reciprocal changeset was discarded during OT. CHECK(reciprocal_changeset.empty()); } +TEST(Sync_SetEmptyReciprocalChangesetAfterNonEmptyReciprocalChangeset) +{ + using namespace realm; + using namespace realm::sync::instr; + using realm::sync::Changeset; + + TEST_CLIENT_DB(db); + + auto& history = get_history(db); + history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false); + timestamp_type timestamp{1}; + history.set_local_origin_timestamp_source([&] { + return ++timestamp; + }); + + auto latest_local_version = [&] { + auto tr = db->start_write(); + // Create schema: single table with array of ints as property. + tr->add_table_with_primary_key("class_table", type_Int, "_id")->add_column_list(type_Int, "ints"); + tr->commit_and_continue_writing(); + + // Create object and initialize array. + TableRef table = tr->get_table("class_table"); + auto obj = table->create_object_with_primary_key(42); + auto ints = obj.get_list("ints"); + ints.insert(0, 1); + ints.insert(1, 2); + ints.insert(2, 3); + tr->commit_and_continue_writing(); + + // Update two elements in the list. + + ints.set_any(2, Mixed{4}); + tr->commit_and_continue_writing(); + + ints.set_any(0, Mixed{5}); + return tr->commit(); + }(); + + // Create remote changeset (erase at the same index) that: + // 1. Updates the prior_size of the first local update + // 2. Discards the second local update + // After OT, we end up with two reciprocal changesets: a non-empty + // and an empty one. + Changeset changeset; + ArrayErase instr; + instr.table = changeset.intern_string("table"); + instr.object = instr::PrimaryKey{42}; + instr.field = changeset.intern_string("ints"); + instr.prior_size = 3; + instr.path.push_back(0); + changeset.push_back(instr); + changeset.version = 1; + // Make it so the merging window contains the last two local updates. + changeset.last_integrated_remote_version = latest_local_version - 2; + changeset.origin_timestamp = timestamp; + changeset.origin_file_ident = 2; + + ChangesetEncoder::Buffer encoded; + std::vector server_changesets_encoded; + encode_changeset(changeset, encoded); + server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version, + BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp, + changeset.origin_file_ident); + + SyncProgress progress = {}; + progress.download.server_version = changeset.version; + progress.download.last_integrated_client_version = changeset.last_integrated_remote_version; + progress.latest_server_version.version = changeset.version; + progress.latest_server_version.salt = 0x7876543217654321; + + uint_fast64_t downloadable_bytes = 0; + VersionInfo version_info; + auto transact = db->start_read(); + history.integrate_server_changesets(progress, downloadable_bytes, server_changesets_encoded, version_info, + DownloadBatchState::SteadyState, *test_context.logger, transact); + + // The first reciprocal changeset has the prior_size changed. + auto reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version - 1); + CHECK_EQUAL(reciprocal_changeset.size(), 1); + auto instruction = reciprocal_changeset.begin()->get_if(); + CHECK_EQUAL(instruction->prior_size, 2); + CHECK_EQUAL(instruction->value.data.integer, 4); + + reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version); + // The second reciprocal changeset is empty. + CHECK(reciprocal_changeset.empty()); +} + +TEST(Sync_GetEmptyReciprocalChangesetFromCache) +{ + using namespace realm; + using namespace realm::sync::instr; + using realm::sync::Changeset; + + TEST_CLIENT_DB(db); + + auto& history = get_history(db); + history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false); + timestamp_type timestamp{1}; + history.set_local_origin_timestamp_source([&] { + return ++timestamp; + }); + + auto latest_local_version = [&] { + auto tr = db->start_write(); + // Create schema + TableRef table = tr->add_table_with_primary_key("class_table", type_Int, "_id"); + table->add_column_list(type_Int, "ints"); + table->add_column_dictionary(type_Int, "dict"); + tr->commit_and_continue_writing(); + + // Create object + auto obj = table->create_object_with_primary_key(42); + auto ints = obj.get_list("ints"); + ints.insert(0, 1); + ints.insert(1, 2); + ints.insert(2, 3); + tr->commit_and_continue_writing(); + + // Update list + ints.set_any(2, Mixed{4}); + tr->commit_and_continue_writing(); + + // Update dictionary + auto dict = obj.get_dictionary("dict"); + dict.insert("key", 42); + return tr->commit(); + }(); + + std::vector server_changesets; + // Create remote changeset that updates the list and discards + // the (local) dictionary update. + Changeset changeset; + ArrayErase instr; + instr.table = changeset.intern_string("table"); + instr.object = instr::PrimaryKey{42}; + instr.field = changeset.intern_string("ints"); + instr.prior_size = 3; + instr.path.push_back(0); + changeset.push_back(instr); + Update instr2; + instr2.table = changeset.intern_string("table"); + instr2.object = instr::PrimaryKey{42}; + instr2.field = changeset.intern_string("dict"); + auto key = changeset.intern_string("key2"); + instr2.path.push_back(key); + instr2.value = Payload{int64_t(0)}; + changeset.push_back(instr2); + Clear instr3; + instr3.table = changeset.intern_string("table"); + instr3.object = instr::PrimaryKey{42}; + instr3.field = changeset.intern_string("dict"); + instr3.collection_type = instr::CollectionType::Dictionary; + changeset.push_back(instr3); + changeset.version = 1; + // Make it so the merging window contains the last two local updates. + changeset.last_integrated_remote_version = latest_local_version - 2; + changeset.origin_timestamp = timestamp - 2; + changeset.origin_file_ident = 2; + server_changesets.push_back(changeset); + + // Create changeset that inserts the same key as the local (discarded) update. + Changeset changeset2; + Update instr4; + instr4.table = changeset2.intern_string("table"); + instr4.object = instr::PrimaryKey{42}; + instr4.field = changeset2.intern_string("dict"); + key = changeset2.intern_string("key"); + instr4.path.push_back(key); + instr4.value = Payload{int64_t(-6)}; + changeset2.push_back(instr4); + changeset2.version = 2; + // Make it so the merging window contains the local dictionary update. + changeset2.last_integrated_remote_version = latest_local_version - 1; + changeset2.origin_timestamp = timestamp - 1; + changeset2.origin_file_ident = 2; + server_changesets.push_back(changeset2); + + std::vector encoded; + std::vector server_changesets_encoded; + for (const auto& changeset : server_changesets) { + encoded.emplace_back(); + encode_changeset(changeset, encoded.back()); + server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version, + BinaryData(encoded.back().data(), encoded.back().size()), + changeset.origin_timestamp, changeset.origin_file_ident); + } + + SyncProgress progress = {}; + progress.download.server_version = server_changesets.back().version; + // Prevent history being trimmed when server changes are integrated so we can verify the reciprocal changesets. + progress.download.last_integrated_client_version = server_changesets.front().last_integrated_remote_version; + progress.latest_server_version.version = server_changesets.back().version; + progress.latest_server_version.salt = 0x7876543217654321; + + uint_fast64_t downloadable_bytes = 0; + VersionInfo version_info; + auto transact = db->start_read(); + history.integrate_server_changesets(progress, downloadable_bytes, server_changesets_encoded, version_info, + DownloadBatchState::SteadyState, *test_context.logger, transact); + + // The remote dictionary update persists. + auto tr = db->start_read(); + auto dict = tr->get_table("class_table")->get_object_with_primary_key(42).get_dictionary("dict"); + CHECK(!dict.is_empty()); + CHECK(dict.get("key") == -6); + + // The first reciprocal changeset has the prior_size changed. + auto reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version - 1); + CHECK_EQUAL(reciprocal_changeset.size(), 1); + auto instruction = reciprocal_changeset.begin()->get_if(); + CHECK_EQUAL(instruction->prior_size, 2); + CHECK_EQUAL(instruction->value.data.integer, 4); + // The second reciprocal changeset is empty. + reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version); + CHECK(reciprocal_changeset.empty()); +} + +TEST(Sync_GetEmptyReciprocalChangesetFromArray) +{ + using namespace realm; + using namespace realm::sync::instr; + using realm::sync::Changeset; + + TEST_CLIENT_DB(db); + + auto& history = get_history(db); + history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false); + timestamp_type timestamp{1}; + history.set_local_origin_timestamp_source([&] { + return ++timestamp; + }); + + auto latest_local_version = [&] { + auto tr = db->start_write(); + // Create schema + TableRef table = tr->add_table_with_primary_key("class_table", type_Int, "_id"); + table->add_column_list(type_Int, "ints"); + table->add_column_dictionary(type_Int, "dict"); + tr->commit_and_continue_writing(); + + // Create object + auto obj = table->create_object_with_primary_key(42); + auto ints = obj.get_list("ints"); + ints.insert(0, 1); + ints.insert(1, 2); + ints.insert(2, 3); + tr->commit_and_continue_writing(); + + // Update list + ints.set_any(2, Mixed{4}); + tr->commit_and_continue_writing(); + + // Update dictionary + auto dict = obj.get_dictionary("dict"); + dict.insert("key", 42); + return tr->commit(); + }(); + + std::vector server_changesets; + // Create remote changeset that updates the list and discards + // the (local) dictionary update. + Changeset changeset; + ArrayErase instr; + instr.table = changeset.intern_string("table"); + instr.object = instr::PrimaryKey{42}; + instr.field = changeset.intern_string("ints"); + instr.prior_size = 3; + instr.path.push_back(0); + changeset.push_back(instr); + Update instr2; + instr2.table = changeset.intern_string("table"); + instr2.object = instr::PrimaryKey{42}; + instr2.field = changeset.intern_string("dict"); + auto key = changeset.intern_string("key2"); + instr2.path.push_back(key); + instr2.value = Payload{int64_t(0)}; + changeset.push_back(instr2); + Clear instr3; + instr3.table = changeset.intern_string("table"); + instr3.object = instr::PrimaryKey{42}; + instr3.field = changeset.intern_string("dict"); + instr3.collection_type = instr::CollectionType::Dictionary; + changeset.push_back(instr3); + changeset.version = 1; + // Make it so the merging window contains the last two local updates. + changeset.last_integrated_remote_version = latest_local_version - 2; + changeset.origin_timestamp = timestamp - 2; + changeset.origin_file_ident = 2; + server_changesets.push_back(changeset); + + // Create changeset that inserts the same key as the local (discarded) update. + Changeset changeset2; + Update instr4; + instr4.table = changeset2.intern_string("table"); + instr4.object = instr::PrimaryKey{42}; + instr4.field = changeset2.intern_string("dict"); + key = changeset2.intern_string("key"); + instr4.path.push_back(key); + instr4.value = Payload{int64_t(-6)}; + changeset2.push_back(instr4); + changeset2.version = 2; + // Make it so the merging window contains the local dictionary update. + changeset2.last_integrated_remote_version = latest_local_version - 1; + changeset2.origin_timestamp = timestamp - 1; + changeset2.origin_file_ident = 2; + server_changesets.push_back(changeset2); + + std::vector server_changesets_encoded; + for (const auto& changeset : server_changesets) { + ChangesetEncoder::Buffer encoded; + encode_changeset(changeset, encoded); + server_changesets_encoded.clear(); + server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version, + BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp, + changeset.origin_file_ident); + + SyncProgress progress = {}; + progress.download.server_version = changeset.version; + // Prevent history being trimmed when server changes are integrated so we can verify the reciprocal + // changesets. + progress.download.last_integrated_client_version = server_changesets.front().last_integrated_remote_version; + progress.latest_server_version.version = changeset.version; + progress.latest_server_version.salt = 0x7876543217654321; + + uint_fast64_t downloadable_bytes = 0; + VersionInfo version_info; + auto transact = db->start_read(); + history.integrate_server_changesets(progress, downloadable_bytes, server_changesets_encoded, version_info, + DownloadBatchState::SteadyState, *test_context.logger, transact); + } + + // The remote dictionary update persists. + auto tr = db->start_read(); + auto dict = tr->get_table("class_table")->get_object_with_primary_key(42).get_dictionary("dict"); + CHECK(!dict.is_empty()); + CHECK(dict.get("key") == -6); + + // The first reciprocal changeset has the prior_size changed. + auto reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version - 1); + CHECK_EQUAL(reciprocal_changeset.size(), 1); + auto instruction = reciprocal_changeset.begin()->get_if(); + CHECK_EQUAL(instruction->prior_size, 2); + CHECK_EQUAL(instruction->value.data.integer, 4); + // The second reciprocal changeset is empty. + reciprocal_changeset = get_reciprocal_changeset(history, latest_local_version); + CHECK(reciprocal_changeset.empty()); +} + TEST(Sync_InvalidChangesetFromServer) { TEST_CLIENT_DB(db);