Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish between an empty and a null (not existing) reciprocal changeset #6367

Merged
merged 6 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT. This may have resulted in arrays being in different orders on different devices. ([#6191](https://github.com/realm/realm-core/issues/6191), since v11.13.0)

### Breaking changes
* You can no longer associate a Logger Factory with the SyncManager. Instead you can install one default logger via Logger::set_default_logger(). This logger will then be used all over Core. Logging cmake flags updated to use REALM_TEST_LOGGING and REALM_TEST_LOGGING_LEVEL
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/changeset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct Changeset {
struct IteratorImpl;
using iterator = IteratorImpl<false>;
using const_iterator = IteratorImpl<true>;
using value_type = Instruction;
iterator begin() noexcept;
iterator end() noexcept;
const_iterator begin() const noexcept;
Expand Down
9 changes: 8 additions & 1 deletion src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,11 @@ void ClientHistory::set_reciprocal_transform(version_type version, BinaryData da
std::size_t index = size_t(version - m_sync_history_base_version) - 1;
REALM_ASSERT(index < sync_history_size());

if (data.is_null()) {
m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws
return;
}

auto compressed = util::compression::allocate_and_compress_nonportable(data);
m_arrays->reciprocal_transforms.set(index, BinaryData{compressed.data(), compressed.size()}); // Throws
}
Expand Down Expand Up @@ -764,6 +769,7 @@ Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset,

void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
{
REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
Expand All @@ -775,7 +781,7 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
m_arrays->changesets.add(BinaryData{compressed.data(), compressed.size()}); // Throws
}
else {
m_arrays->changesets.add(BinaryData()); // Throws
m_arrays->changesets.add(BinaryData("", 0)); // Throws
}

m_arrays->reciprocal_transforms.add(BinaryData{}); // Throws
Expand Down Expand Up @@ -1175,6 +1181,7 @@ void ClientHistory::update_from_ref_and_version(ref_type ref, version_type versi

m_ct_history_base_version = version - ct_history_size();
m_sync_history_base_version = version - sync_history_size();
REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
Expand Down
171 changes: 171 additions & 0 deletions test/test_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6680,4 +6680,175 @@ TEST(Sync_InvalidChangesetFromServer)
StringData(e.what()).contains("Failed to parse received changeset: Invalid interned string"));
}

// Tests that an empty reciprocal changesets is set and retrieved correctly.
TEST(Sync_SetAndGetEmptyReciprocalChangeset)
{
using namespace realm;
using namespace realm::sync::instr;
using realm::sync::Changeset;

TEST_CLIENT_DB(db);

auto& history = get_history(db);
history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false);
timestamp_type timestamp{1};
history.set_local_origin_timestamp_source([&] {
return ++timestamp;
});

auto latest_local_verson = [&] {
auto tr = db->start_write();
// Create schema: single table with array of ints as property.
tr->add_table_with_primary_key("class_table", type_Int, "_id")->add_column_list(type_Int, "ints");
tr->commit_and_continue_writing();

// Create object and initialize array.
TableRef table = tr->get_table("class_table");
auto obj = table->create_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
for (auto i = 0; i < 8; ++i) {
ints.insert(i, i);
}
tr->commit_and_continue_writing();

// Move element in array.
ints.move(7, 2);
return tr->commit();
}();

// Create changeset which moves element from index 7 to index 0 in array.
// This changeset will discard the previous move (reciprocal changeset), leaving the local reciprocal changesets
// with no instructions (empty).
Changeset changeset;
ArrayMove instr;
instr.table = changeset.intern_string("table");
instr.object = instr::PrimaryKey{42};
instr.field = changeset.intern_string("ints");
instr.path.push_back(7);
instr.ndx_2 = 0;
instr.prior_size = 8;
changeset.push_back(instr);
changeset.version = 1;
changeset.last_integrated_remote_version = latest_local_verson - 1;
changeset.origin_timestamp = timestamp;
changeset.origin_file_ident = 2;

ChangesetEncoder::Buffer encoded;
std::vector<Transformer::RemoteChangeset> server_changesets_encoded;
encode_changeset(changeset, encoded);
server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version,
BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp,
changeset.origin_file_ident);

SyncProgress progress = {};
progress.download.server_version = changeset.version;
progress.download.last_integrated_client_version = latest_local_verson - 1;
progress.latest_server_version.version = changeset.version;
progress.latest_server_version.salt = 0x7876543217654321;

uint_fast64_t downloadable_bytes = 0;
VersionInfo version_info;
util::StderrLogger logger;
auto transact = db->start_read();
history.integrate_server_changesets(progress, &downloadable_bytes, server_changesets_encoded, version_info,
DownloadBatchState::SteadyState, logger, transact);

bool is_compressed = false;
auto data = history.get_reciprocal_transform(latest_local_verson, is_compressed);
Changeset reciprocal_changeset;
ChunkedBinaryInputStream in{data};
if (is_compressed) {
size_t total_size;
auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size);
CHECK(decompressed);
sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws
}
else {
sync::parse_changeset(in, reciprocal_changeset); // Throws
}
// The only instruction in the reciprocal changeset was discarded during OT.
CHECK(reciprocal_changeset.empty());
}

TEST(Sync_TransformAgainstEmptyReciprocalChangeset)
{
TEST_DIR(dir);
TEST_CLIENT_DB(seed_db);
TEST_CLIENT_DB(db_1);
TEST_CLIENT_DB(db_2);

{
auto tr = seed_db->start_write();
// Create schema: single table with array of ints as property.
auto table = tr->add_table_with_primary_key("class_table", type_Int, "_id");
table->add_column_list(type_Int, "ints");
table->add_column(type_String, "string");
tr->commit_and_continue_writing();

// Create object and initialize array.
table = tr->get_table("class_table");
auto obj = table->create_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
for (auto i = 0; i < 8; ++i) {
ints.insert(i, i);
}
tr->commit();
}

MultiClientServerFixture fixture(3, 1, dir, test_context);
fixture.start();

util::Optional<Session> seed_session = fixture.make_bound_session(0, seed_db, 0, "/test");
util::Optional<Session> db_1_session = fixture.make_bound_session(1, db_1, 0, "/test");
util::Optional<Session> db_2_session = fixture.make_bound_session(2, db_2, 0, "/test");

seed_session->wait_for_upload_complete_or_client_stopped();
db_1_session->wait_for_download_complete_or_client_stopped();
db_2_session->wait_for_download_complete_or_client_stopped();
seed_session.reset();
db_2_session.reset();

auto move_element = [&](const DBRef& db, size_t from, size_t to, size_t string_size = 0) {
auto wt = db->start_write();
auto table = wt->get_table("class_table");
auto obj = table->get_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
ints.move(from, to);
obj.set("string", std::string(string_size, 'a'));
wt->commit();
};

// Client 1 uploads two move instructions.
move_element(db_1, 7, 2);
move_element(db_1, 7, 6);

db_1_session->wait_for_upload_complete_or_client_stopped();

std::this_thread::sleep_for(std::chrono::milliseconds{10});

// Client 2 uploads two move instructions.
// The sync client uploads at most 128 KB of data so we make the first changeset large enough so two upload
// messages are sent to the server instead of one. Each change is transformed against the changes from Client 1.

// First change discards the first change (move(7, 2)) of Client 1.
move_element(db_2, 7, 0, 200 * 1024);
// Second change is tranformed against an empty reciprocal changeset as result of the change above.
move_element(db_2, 7, 5);
db_2_session = fixture.make_bound_session(2, db_2, 0, "/test");

db_1_session->wait_for_upload_complete_or_client_stopped();
db_2_session->wait_for_upload_complete_or_client_stopped();

db_1_session->wait_for_download_complete_or_client_stopped();
db_2_session->wait_for_download_complete_or_client_stopped();

ReadTransaction rt_1(db_1);
ReadTransaction rt_2(db_2);
const Group& group_1 = rt_1;
const Group& group_2 = rt_2;
group_1.verify();
group_2.verify();
CHECK(compare_groups(rt_1, rt_2));
}

} // unnamed namespace