Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish between an empty and a null (not existing) reciprocal changeset #6367

Merged
merged 6 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* None.
* Fixed a case of history diverging when empty reciprocal changesets are part of the merging window in OT ([#6191](https://github.com/realm/realm-core/issues/6191), since v11.13.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add something here like - this may have resulted in arrays being in different orders on different devices - so that users reading this will know what symptom to look for?


### Breaking changes
* You can no longer associate a Logger Factory with the SyncManager. Instead you can install one default logger via Logger::set_default_logger(). This logger will then be used all over Core. Logging cmake flags updated to use REALM_TEST_LOGGING and REALM_TEST_LOGGING_LEVEL
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/changeset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct Changeset {
struct IteratorImpl;
using iterator = IteratorImpl<false>;
using const_iterator = IteratorImpl<true>;
using value_type = Instruction;
iterator begin() noexcept;
iterator end() noexcept;
const_iterator begin() const noexcept;
Expand Down
9 changes: 8 additions & 1 deletion src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,11 @@ void ClientHistory::set_reciprocal_transform(version_type version, BinaryData da
std::size_t index = size_t(version - m_sync_history_base_version) - 1;
REALM_ASSERT(index < sync_history_size());

if (data.is_null()) {
m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws
return;
}

auto compressed = util::compression::allocate_and_compress_nonportable(data);
m_arrays->reciprocal_transforms.set(index, BinaryData{compressed.data(), compressed.size()}); // Throws
}
Expand Down Expand Up @@ -764,6 +769,7 @@ Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset,

void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
{
REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
Expand All @@ -775,7 +781,7 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
m_arrays->changesets.add(BinaryData{compressed.data(), compressed.size()}); // Throws
}
else {
m_arrays->changesets.add(BinaryData()); // Throws
m_arrays->changesets.add(BinaryData("", 0)); // Throws
}

m_arrays->reciprocal_transforms.add(BinaryData{}); // Throws
Expand Down Expand Up @@ -1175,6 +1181,7 @@ void ClientHistory::update_from_ref_and_version(ref_type ref, version_type versi

m_ct_history_base_version = version - ct_history_size();
m_sync_history_base_version = version - sync_history_size();
REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
Expand Down
90 changes: 90 additions & 0 deletions test/test_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6680,4 +6680,94 @@ TEST(Sync_InvalidChangesetFromServer)
StringData(e.what()).contains("Failed to parse received changeset: Invalid interned string"));
}

// Tests that an empty reciprocal changesets is set and retrieved correctly.
TEST(Sync_SetAndGetEmptyReciprocalChangeset)
{
using namespace realm;
using namespace realm::sync::instr;
using realm::sync::Changeset;

TEST_CLIENT_DB(db);

auto& history = get_history(db);
history.set_client_file_ident(SaltedFileIdent{1, 0x1234567812345678}, false);
timestamp_type timestamp{1};
history.set_local_origin_timestamp_source([&] {
return ++timestamp;
});

auto latest_local_verson = [&] {
auto tr = db->start_write();
// Create schema: single table with array of ints as property.
tr->add_table_with_primary_key("class_table", type_Int, "_id")->add_column_list(type_Int, "ints");
tr->commit_and_continue_writing();

// Create object and initialize array.
TableRef table = tr->get_table("class_table");
auto obj = table->create_object_with_primary_key(42);
auto ints = obj.get_list<int64_t>("ints");
for (auto i = 0; i < 8; ++i) {
ints.insert(i, i);
}
tr->commit_and_continue_writing();

// Move element in array.
ints.move(7, 2);
return tr->commit();
}();

// Create changeset which moves element from index 7 to index 0 in array.
// This changeset will discard the previous move (reciprocal changeset), leaving the local reciprocal changesets
// with no instructions (empty).
Changeset changeset;
ArrayMove instr;
instr.table = changeset.intern_string("table");
instr.object = instr::PrimaryKey{42};
instr.field = changeset.intern_string("ints");
instr.path.push_back(7);
instr.ndx_2 = 0;
instr.prior_size = 8;
changeset.push_back(instr);
changeset.version = 1;
changeset.last_integrated_remote_version = latest_local_verson - 1;
changeset.origin_timestamp = timestamp;
changeset.origin_file_ident = 2;

ChangesetEncoder::Buffer encoded;
std::vector<Transformer::RemoteChangeset> server_changesets_encoded;
encode_changeset(changeset, encoded);
server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version,
BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp,
changeset.origin_file_ident);

SyncProgress progress = {};
progress.download.server_version = changeset.version;
progress.download.last_integrated_client_version = latest_local_verson - 1;
progress.latest_server_version.version = changeset.version;
progress.latest_server_version.salt = 0x7876543217654321;

uint_fast64_t downloadable_bytes = 0;
VersionInfo version_info;
util::StderrLogger logger;
auto transact = db->start_read();
history.integrate_server_changesets(progress, &downloadable_bytes, server_changesets_encoded, version_info,
DownloadBatchState::SteadyState, logger, transact);

bool is_compressed = false;
auto data = history.get_reciprocal_transform(latest_local_verson, is_compressed);
Changeset reciprocal_changeset;
ChunkedBinaryInputStream in{data};
if (is_compressed) {
size_t total_size;
auto decompressed = util::compression::decompress_nonportable_input_stream(in, total_size);
CHECK(decompressed);
sync::parse_changeset(*decompressed, reciprocal_changeset); // Throws
}
else {
sync::parse_changeset(in, reciprocal_changeset); // Throws
}
// The only instruction in the reciprocal changeset was discarded during OT.
CHECK(reciprocal_changeset.empty());
}

} // unnamed namespace