Skip to content

Commit

Permalink
Distinguish between an empty and a null (not existing) reciprocal cha…
Browse files Browse the repository at this point in the history
…ngeset (#6367)

* Distinguish between an empty and a null (not existing)  reciprocal changeset

* changelog

* Changes after code reiview

* More changes after code review

* Better changelog message

* Integration test
  • Loading branch information
danieltabacaru authored Mar 15, 2023
1 parent 5b2fc04 commit d69c2dd
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT. This may have resulted in arrays being in different orders on different devices. ([#6191](https://github.com/realm/realm-core/issues/6191), since v11.13.0)

### Breaking changes
* None.
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/changeset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct Changeset {
struct IteratorImpl;
using iterator = IteratorImpl<false>;
using const_iterator = IteratorImpl<true>;
using value_type = Instruction;
iterator begin() noexcept;
iterator end() noexcept;
const_iterator begin() const noexcept;
Expand Down
9 changes: 8 additions & 1 deletion src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,11 @@ void ClientHistory::set_reciprocal_transform(version_type version, BinaryData da
std::size_t index = size_t(version - m_sync_history_base_version) - 1;
REALM_ASSERT(index < sync_history_size());

if (data.is_null()) {
m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws
return;
}

auto compressed = util::compression::allocate_and_compress_nonportable(data);
m_arrays->reciprocal_transforms.set(index, BinaryData{compressed.data(), compressed.size()}); // Throws
}
Expand Down Expand Up @@ -764,6 +769,7 @@ Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset,

void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
{
REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
Expand All @@ -775,7 +781,7 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
m_arrays->changesets.add(BinaryData{compressed.data(), compressed.size()}); // Throws
}
else {
m_arrays->changesets.add(BinaryData()); // Throws
m_arrays->changesets.add(BinaryData("", 0)); // Throws
}

m_arrays->reciprocal_transforms.add(BinaryData{}); // Throws
Expand Down Expand Up @@ -1175,6 +1181,7 @@ void ClientHistory::update_from_ref_and_version(ref_type ref, version_type versi

m_ct_history_base_version = version - ct_history_size();
m_sync_history_base_version = version - sync_history_size();
REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
Expand Down
171 changes: 171 additions & 0 deletions test/test_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6680,4 +6680,175 @@ TEST(Sync_InvalidChangesetFromServer)
StringData(e.what()).contains("Failed to parse received changeset: Invalid interned string"));
}

// Tests that an empty reciprocal changesets is set and retrieved correctly.
TEST(Sync_SetAndGetEmptyReciprocalChangeset)
{
using namespace realm;
using namespace realm::sync::instr;
using realm::sync::Changeset;

TEST_CLIENT_DB(db);

auto& history = get_history(db);
history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false);
timestamp_type timestamp{1};
history.set_local_origin_timestamp_source([&] {
return ++timestamp;
});

auto latest_local_verson = [&] {
auto tr = db->start_write();
// Create schema: single table with array of ints as property.
tr->add_table_with_primary_key("class_table", type_Int, "_id")->add_column_list(type_Int, "ints");
tr->commit_and_continue_writing();

// Create object and initialize array.
TableRef table = tr->get_table("class_table");
auto obj = table->create_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
for (auto i = 0; i < 8; ++i) {
ints.insert(i, i);
}
tr->commit_and_continue_writing();

// Move element in array.
ints.move(7, 2);
return tr->commit();
}();

// Create changeset which moves element from index 7 to index 0 in array.
// This changeset will discard the previous move (reciprocal changeset), leaving the local reciprocal changesets
// with no instructions (empty).
Changeset changeset;
ArrayMove instr;
instr.table = changeset.intern_string("table");
instr.object = instr::PrimaryKey{42};
instr.field = changeset.intern_string("ints");
instr.path.push_back(7);
instr.ndx_2 = 0;
instr.prior_size = 8;
changeset.push_back(instr);
changeset.version = 1;
changeset.last_integrated_remote_version = latest_local_verson - 1;
changeset.origin_timestamp = timestamp;
changeset.origin_file_ident = 2;

ChangesetEncoder::Buffer encoded;
std::vector<Transformer::RemoteChangeset> server_changesets_encoded;
encode_changeset(changeset, encoded);
server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version,
BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp,
changeset.origin_file_ident);

SyncProgress progress = {};
progress.download.server_version = changeset.version;
progress.download.last_integrated_client_version = latest_local_verson - 1;
progress.latest_server_version.version = changeset.version;
progress.latest_server_version.salt = 0x7876543217654321;

uint_fast64_t downloadable_bytes = 0;
VersionInfo version_info;
util::StderrLogger logger;
auto transact = db->start_read();
history.integrate_server_changesets(progress, &downloadable_bytes, server_changesets_encoded, version_info,
DownloadBatchState::SteadyState, logger, transact);

bool is_compressed = false;
auto data = history.get_reciprocal_transform(latest_local_verson, is_compressed);
Changeset reciprocal_changeset;
ChunkedBinaryInputStream in{data};
if (is_compressed) {
size_t total_size;
auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size);
CHECK(decompressed);
sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws
}
else {
sync::parse_changeset(in, reciprocal_changeset); // Throws
}
// The only instruction in the reciprocal changeset was discarded during OT.
CHECK(reciprocal_changeset.empty());
}

TEST(Sync_TransformAgainstEmptyReciprocalChangeset)
{
TEST_DIR(dir);
TEST_CLIENT_DB(seed_db);
TEST_CLIENT_DB(db_1);
TEST_CLIENT_DB(db_2);

{
auto tr = seed_db->start_write();
// Create schema: single table with array of ints as property.
auto table = tr->add_table_with_primary_key("class_table", type_Int, "_id");
table->add_column_list(type_Int, "ints");
table->add_column(type_String, "string");
tr->commit_and_continue_writing();

// Create object and initialize array.
table = tr->get_table("class_table");
auto obj = table->create_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
for (auto i = 0; i < 8; ++i) {
ints.insert(i, i);
}
tr->commit();
}

MultiClientServerFixture fixture(3, 1, dir, test_context);
fixture.start();

util::Optional<Session> seed_session = fixture.make_bound_session(0, seed_db, 0, "/test");
util::Optional<Session> db_1_session = fixture.make_bound_session(1, db_1, 0, "/test");
util::Optional<Session> db_2_session = fixture.make_bound_session(2, db_2, 0, "/test");

seed_session->wait_for_upload_complete_or_client_stopped();
db_1_session->wait_for_download_complete_or_client_stopped();
db_2_session->wait_for_download_complete_or_client_stopped();
seed_session.reset();
db_2_session.reset();

auto move_element = [&](const DBRef& db, size_t from, size_t to, size_t string_size = 0) {
auto wt = db->start_write();
auto table = wt->get_table("class_table");
auto obj = table->get_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
ints.move(from, to);
obj.set("string", std::string(string_size, 'a'));
wt->commit();
};

// Client 1 uploads two move instructions.
move_element(db_1, 7, 2);
move_element(db_1, 7, 6);

db_1_session->wait_for_upload_complete_or_client_stopped();

std::this_thread::sleep_for(std::chrono::milliseconds{10});

// Client 2 uploads two move instructions.
// The sync client uploads at most 128 KB of data so we make the first changeset large enough so two upload
// messages are sent to the server instead of one. Each change is transformed against the changes from Client 1.

// First change discards the first change (move(7, 2)) of Client 1.
move_element(db_2, 7, 0, 200 * 1024);
// Second change is tranformed against an empty reciprocal changeset as result of the change above.
move_element(db_2, 7, 5);
db_2_session = fixture.make_bound_session(2, db_2, 0, "/test");

db_1_session->wait_for_upload_complete_or_client_stopped();
db_2_session->wait_for_upload_complete_or_client_stopped();

db_1_session->wait_for_download_complete_or_client_stopped();
db_2_session->wait_for_download_complete_or_client_stopped();

ReadTransaction rt_1(db_1);
ReadTransaction rt_2(db_2);
const Group& group_1 = rt_1;
const Group& group_2 = rt_2;
group_1.verify();
group_2.verify();
CHECK(compare_groups(rt_1, rt_2));
}

} // unnamed namespace

0 comments on commit d69c2dd

Please sign in to comment.