diff --git a/CHANGELOG.md b/CHANGELOG.md index d2cbc21220b..3c9b4b1a2c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Enhancements * (PR [#????](https://github.com/realm/realm-core/pull/????)) -* None. +* Prioritize integration of local changes over remote changes - shorten the time users may have to wait when committing local changes. Stop storing downloaded changesets in history. ([PR #5844](https://github.com/realm/realm-core/pull/5844)). ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) @@ -17,7 +17,7 @@ ----------- ### Internals -* None. +* Reenable sync benchmark. ---------------------------------------------- @@ -7549,4 +7549,4 @@ Format: 2012-06-?? ---------- - Group() interfaced changed. Now with multiple options. default option changed from readonly... -+ Generated C++ highlevel API for tables with up to 15 columns ++ Generated C++ highlevel API for tables with up to 15 columns \ No newline at end of file diff --git a/doc/algebra_of_changesets.md b/doc/algebra_of_changesets.md index fb57a476514..5f87bf61048 100644 --- a/doc/algebra_of_changesets.md +++ b/doc/algebra_of_changesets.md @@ -22,7 +22,7 @@ stepwise for a concatenated changeset: S(α, A + B) = S(S(α, A), B) (1) **Definition:** Two changesets `A` and `B`, having the same base state, `α`, are -*equivalent*, written as `A ~ B`, if, and onlæy if they produce the same final +*equivalent*, written as `A ~ B`, if, and only if they produce the same final state, that is, if, and only if `S(α, A) = S(α, B)`. This does not mean that `A` and `B` are equal. diff --git a/evergreen/config.yml b/evergreen/config.yml index 6854932a537..510a189b476 100644 --- a/evergreen/config.yml +++ b/evergreen/config.yml @@ -149,6 +149,8 @@ functions: exit 1 fi + export UNITTEST_THREADS=1 + BENCHMARK=$(./evergreen/abspath.sh ./build/test/benchmark-${benchmark_name}/${cmake_build_type|Debug}/realm-benchmark-${benchmark_name}) echo "Going to run benchmark $BENCHMARK" @@ -525,6 +527,14 @@ tasks: vars: benchmark_name: crud +- name: benchmark-sync + exec_timeout_secs: 1800 + tags: [ "benchmark" ] + commands: + - func: "run benchmark" + vars: + benchmark_name: sync + - name: sync-tests tags: [ "test_suite", "for_pull_requests" ] exec_timeout_secs: 1800 diff --git a/src/realm/db.cpp b/src/realm/db.cpp index 6fba3c7b70c..cb01f3b7fed 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1518,6 +1518,17 @@ void DB::close_internal(std::unique_lock lock, bool allow_ope } } +bool DB::other_writers_waiting_for_lock() const +{ + SharedInfo* info = m_file_map.get_addr(); + + uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed); + uint32_t next_served = info->next_served.load(std::memory_order_relaxed); + // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and + // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock. + return next_ticket > next_served + 1; +} + class DB::AsyncCommitHelper { public: AsyncCommitHelper(DB* db) diff --git a/src/realm/db.hpp b/src/realm/db.hpp index 6640bfac647..5f426b14321 100644 --- a/src/realm/db.hpp +++ b/src/realm/db.hpp @@ -408,6 +408,10 @@ class DB : public std::enable_shared_from_this { void claim_sync_agent(); void release_sync_agent(); + /// Returns true if there are threads waiting to acquire the write lock, false otherwise. + /// To be used only when already holding the lock. + bool other_writers_waiting_for_lock() const; + protected: explicit DB(const DBOptions& options); // Is this ever used? diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 772cee9dfe5..6bd1caaeb9c 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -740,9 +740,10 @@ void SyncSession::create_sync_session() session_config.simulate_integration_error = sync_config.simulate_integration_error; if (sync_config.on_download_message_received_hook) { session_config.on_download_message_received_hook = - [hook = sync_config.on_download_message_received_hook, anchor = weak_from_this()]( - const sync::SyncProgress& progress, int64_t query_version, sync::DownloadBatchState batch_state) { - hook(anchor, progress, query_version, batch_state); + [hook = sync_config.on_download_message_received_hook, + anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version, + sync::DownloadBatchState batch_state, size_t num_changesets) { + hook(anchor, progress, query_version, batch_state, num_changesets); }; } if (sync_config.on_bootstrap_message_processed_hook) { @@ -753,6 +754,14 @@ void SyncSession::create_sync_session() return hook(anchor, progress, query_version, batch_state); }; } + if (sync_config.on_download_message_integrated_hook) { + session_config.on_download_message_integrated_hook = + [hook = sync_config.on_download_message_integrated_hook, + anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version, + sync::DownloadBatchState batch_state, size_t num_changesets) { + hook(anchor, progress, query_version, batch_state, num_changesets); + }; + } { std::string sync_route = m_sync_manager->sync_route(); diff --git a/src/realm/object-store/util/event_loop_dispatcher.hpp b/src/realm/object-store/util/event_loop_dispatcher.hpp index 0c59e0f2f19..a9a429b1bd3 100644 --- a/src/realm/object-store/util/event_loop_dispatcher.hpp +++ b/src/realm/object-store/util/event_loop_dispatcher.hpp @@ -63,6 +63,8 @@ class EventLoopDispatcher { } }; +} // namespace util + namespace _impl::ForEventLoopDispatcher { template struct ExtractSignatureImpl { @@ -106,6 +108,8 @@ template using ExtractSignature = typename ExtractSignatureImpl::signature; } // namespace _impl::ForEventLoopDispatcher +namespace util { + // Deduction guide for function pointers. template EventLoopDispatcher(void (*)(Args...)) -> EventLoopDispatcher; diff --git a/src/realm/sync/changeset.hpp b/src/realm/sync/changeset.hpp index 0482b88d1cc..e37a29a7f0b 100644 --- a/src/realm/sync/changeset.hpp +++ b/src/realm/sync/changeset.hpp @@ -178,6 +178,10 @@ struct Changeset { /// be part of refactoring the ChangesetIndex size_t transform_sequence = 0; + /// If the changeset was compacted during download, the size of the original + /// changeset. Only applies to changesets sent by the server. + std::size_t original_changeset_size = 0; + /// Compare for exact equality, including that interned strings have the /// same integer values, and there is the same number of interned strings, /// same topology of tombstones, etc. diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 68da75ca16b..2d9bb8a33b3 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -244,8 +244,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac util::UniqueFunction m_progress_handler; util::UniqueFunction m_connection_state_change_listener; - std::function m_on_download_message_received_hook; + std::function m_on_download_message_received_hook; std::function m_on_bootstrap_message_processed_hook; + std::function on_download_message_integrated_hook; std::shared_ptr m_flx_subscription_store; int64_t m_flx_active_version = 0; @@ -731,12 +732,7 @@ void SessionImpl::on_resumed() bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state, int64_t query_version, const ReceivedChangesets& received_changesets) { - if (!m_is_flx_sync_session) { - return false; - } - - // If this is a steady state DOWNLOAD, no need for special handling. - if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) { + if (is_steady_state_download_message(batch_state, query_version)) { return false; } @@ -810,12 +806,15 @@ void SessionImpl::process_pending_flx_bootstrap() history.integrate_server_changesets( *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger, - [&](const TransactionRef& tr) { - bootstrap_store->pop_front_pending(tr, pending_batch.changesets.size()); + [&](const TransactionRef& tr, size_t count) { + REALM_ASSERT_3(count, <=, pending_batch.changesets.size()); + bootstrap_store->pop_front_pending(tr, count); }, get_transact_reporter()); progress = *pending_batch.progress; + download_message_integrated_hook(progress, query_version, batch_state, pending_batch.changesets.size()); + logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version " "%3. %4 changesets remaining in bootstrap", pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version, @@ -860,12 +859,40 @@ void SessionImpl::non_sync_flx_completion(int64_t version) } void SessionImpl::receive_download_message_hook(const SyncProgress& progress, int64_t query_version, - DownloadBatchState batch_state) + DownloadBatchState batch_state, size_t num_changesets) { if (REALM_LIKELY(!m_wrapper.m_on_download_message_received_hook)) { return; } - m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state); + m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state, num_changesets); +} + +void SessionImpl::download_message_integrated_hook(const SyncProgress& progress, int64_t query_version, + DownloadBatchState batch_state, size_t num_changesets) +{ + if (REALM_LIKELY(!m_wrapper.on_download_message_integrated_hook)) { + return; + } + + m_wrapper.on_download_message_integrated_hook(progress, query_version, batch_state, num_changesets); +} + +bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version) +{ + if (batch_state == DownloadBatchState::SteadyState) { + return true; + } + + if (!m_is_flx_sync_session) { + return true; + } + + // If this is a steady state DOWNLOAD, no need for special handling. + if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) { + return true; + } + + return false; } // ################ SessionWrapper ################ @@ -891,6 +918,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr= m_flx_last_seen_version); REALM_ASSERT(new_version >= m_flx_active_version); + REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy switch (batch_state) { + case DownloadBatchState::SteadyState: + // Cannot be called with this value. + REALM_UNREACHABLE(); case DownloadBatchState::LastInBatch: if (m_flx_active_version == new_version) { return; diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index af0396c834c..873f4908a2b 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -130,7 +130,7 @@ class BadServerUrl; // Exception /// their bound state), as long as they are associated with the same client /// object, or with two different client objects that do not overlap in /// time. This means, in particular, that it is an error to create two bound -/// session objects for the same local Realm file, it they are associated with +/// session objects for the same local Realm file, if they are associated with /// two different client objects that overlap in time, even if the session /// objects do not overlap in time (in their bound state). It is the /// responsibility of the application to ensure that these rules are adhered @@ -313,15 +313,18 @@ class Session { /// This feature exists exclusively for testing purposes at this time. bool simulate_integration_error = false; - // Will be called after a download message is received and validated by - // the client but befefore it's been transformed or applied. To be used in - // testing only. - std::function + /// Will be called after a download message is received and validated by + /// the client but befefore it's been transformed or applied. To be used in + /// testing only. + std::function on_download_message_received_hook; - // Will be called after each bootstrap message is added to the pending bootstrap store, - // but before processing a finalized bootstrap. For testing only. + /// Will be called after each bootstrap message is added to the pending bootstrap store, + /// but before processing a finalized bootstrap. For testing only. std::function on_bootstrap_message_processed_hook; + /// Will be called after a download message is integrated. For testing only. + std::function + on_download_message_integrated_hook; }; /// \brief Start a new session for the specified client-side Realm. diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp index 884ed52e9e2..37d50f1e950 100644 --- a/src/realm/sync/config.hpp +++ b/src/realm/sync/config.hpp @@ -174,12 +174,17 @@ struct SyncConfig { // Will be called after a download message is received and validated by the client but befefore it's been // transformed or applied. To be used in testing only. - std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)> + std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState, + size_t)> on_download_message_received_hook; // Will be called after each bootstrap message is added to the pending bootstrap store, but before // processing a finalized bootstrap. For testing only. std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)> on_bootstrap_message_processed_hook; + // Will be called after a download message is integrated. For testing only. + std::function, const sync::SyncProgress&, int64_t, sync::DownloadBatchState, + size_t)> + on_download_message_integrated_hook; bool simulate_integration_error = false; diff --git a/src/realm/sync/noinst/changeset_index.hpp b/src/realm/sync/noinst/changeset_index.hpp index 7c9df2f1968..30753782226 100644 --- a/src/realm/sync/noinst/changeset_index.hpp +++ b/src/realm/sync/noinst/changeset_index.hpp @@ -13,7 +13,7 @@ namespace _impl { /// The ChangesetIndex is responsible for keeping track of exactly which /// instructions touch which objects. It does this by recording ranges of -/// instructions in changesets, such that the merge algorithm can make do with +/// instructions in changesets, such that the merge algorithm can do with /// just merging the "relevant" instructions. Due to the semantics of link /// nullification, instruction ranges for objects that have ever been /// "connected" by a link instruction must be joined together. In other words, diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 0b29bf4ca2e..436c0bf5186 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -324,7 +324,7 @@ void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, ve const auto sync_history_size = arrays.changesets.size(); const auto sync_history_base_version = rt->get_version() - sync_history_size; - std::size_t accum_byte_size_soft_limit = 0x20000; // 128 KB + std::size_t accum_byte_size_soft_limit = 131072; // 128 KB std::size_t accum_byte_size_hard_limit = 16777216; // server-imposed limit std::size_t accum_byte_size = 0; @@ -376,47 +376,131 @@ void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, ve } -void ClientHistory::integrate_server_changesets(const SyncProgress& progress, - const std::uint_fast64_t* downloadable_bytes, - util::Span incoming_changesets, - VersionInfo& version_info, DownloadBatchState batch_state, - util::Logger& logger, - util::UniqueFunction run_in_write_tr, - SyncTransactReporter* transact_reporter) +void ClientHistory::integrate_server_changesets( + const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, + util::Span incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state, + util::Logger& logger, util::UniqueFunction run_in_write_tr, + SyncTransactReporter* transact_reporter) { REALM_ASSERT(incoming_changesets.size() != 0); - - std::uint_fast64_t downloaded_bytes_in_message = 0; std::vector changesets; changesets.resize(incoming_changesets.size()); // Throws + // Parse incoming changesets without holding the write lock. try { for (std::size_t i = 0; i < incoming_changesets.size(); ++i) { const RemoteChangeset& changeset = incoming_changesets[i]; - downloaded_bytes_in_message += changeset.original_changeset_size; parse_remote_changeset(changeset, changesets[i]); // Throws changesets[i].transform_sequence = i; } } - catch (const TransformError& e) { + catch (const BadChangesetError& e) { throw IntegrationException(ClientError::bad_changeset, util::format("Failed to parse received changeset: %1", e.what())); } - TransactionRef transact = m_db->start_write(); // Throws - VersionID old_version = transact->get_version_of_current_transaction(); - version_type local_version = old_version.version; - auto sync_file_id = transact->get_sync_file_id(); - REALM_ASSERT(sync_file_id != 0); + VersionID new_version{0, 0}; + auto num_changesets = incoming_changesets.size(); + util::Span changesets_to_integrate(changesets); + + // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, depending on the + // number of times the sync client yields the write lock to allow the user to commit their changes. In each + // iteration, at least one changeset is transformed and committed. + while (!changesets_to_integrate.empty()) { + TransactionRef transact = m_db->start_write(); // Throws + VersionID old_version = transact->get_version_of_current_transaction(); + version_type local_version = old_version.version; + auto sync_file_id = transact->get_sync_file_id(); + REALM_ASSERT(sync_file_id != 0); + + ensure_updated(local_version); // Throws + prepare_for_write(); // Throws + + std::uint64_t downloaded_bytes_in_transaction = 0; + auto changesets_transformed_count = transform_and_apply_server_changesets( + changesets_to_integrate, transact, logger, downloaded_bytes_in_transaction); + + // downloaded_bytes always contains the total number of downloaded bytes + // from the Realm. downloaded_bytes must be persisted in the Realm, since + // the downloaded changesets are trimmed after use, and since it would be + // expensive to traverse the entire history. + Array& root = m_arrays->root; + auto downloaded_bytes = + std::uint64_t(root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int()); + downloaded_bytes += downloaded_bytes_in_transaction; + root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws + + const RemoteChangeset& last_changeset = incoming_changesets[changesets_transformed_count - 1]; + changesets_to_integrate = changesets_to_integrate.sub_span(changesets_transformed_count); + incoming_changesets = incoming_changesets.sub_span(changesets_transformed_count); + + // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same + // synthetic server version that represents synthetic changesets generated from state on the server. + if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) { + update_sync_progress(progress, downloadable_bytes, transact); // Throws + } + // Always update progress for download messages from steady state. + else if (batch_state == DownloadBatchState::SteadyState) { + auto partial_progress = progress; + partial_progress.download.server_version = last_changeset.remote_version; + partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version; + update_sync_progress(partial_progress, downloadable_bytes, transact); // Throws + } + if (run_in_write_tr) { + run_in_write_tr(transact, changesets_transformed_count); + } - ensure_updated(local_version); // Throws - prepare_for_write(); // Throws + // The reason we can use the `origin_timestamp`, and the `origin_file_ident` + // from the last transformed changeset, and ignore all the other changesets, is + // that these values are actually irrelevant for changesets of remote origin + // stored in the client-side history (for now), except that + // `origin_file_ident` is required to be nonzero, to mark it as having been + // received from the server. + HistoryEntry entry; + entry.origin_timestamp = last_changeset.origin_timestamp; + entry.origin_file_ident = last_changeset.origin_file_ident; + entry.remote_version = last_changeset.remote_version; + add_sync_history_entry(entry); // Throws + + // Tell prepare_commit()/add_changeset() not to write a history entry for + // this transaction as we already did it. + REALM_ASSERT(!m_applying_server_changeset); + m_applying_server_changeset = true; + new_version = transact->commit_and_continue_as_read(); // Throws + + if (transact_reporter) { + transact_reporter->report_sync_transact(old_version, new_version); // Throws + } + + logger.debug("Integrated %1 changesets out of %2", changesets_transformed_count, num_changesets); + } + + REALM_ASSERT(new_version.version > 0); + version_info.realm_version = new_version.version; + version_info.sync_version = {new_version.version, 0}; +} + + +size_t ClientHistory::transform_and_apply_server_changesets(util::Span changesets_to_integrate, + TransactionRef transact, util::Logger& logger, + std::uint64_t& downloaded_bytes) +{ + REALM_ASSERT(transact->get_transact_stage() == DB::transact_Writing); + + if (!m_replication.apply_server_changes()) { + std::for_each(changesets_to_integrate.begin(), changesets_to_integrate.end(), [&](const Changeset c) { + downloaded_bytes += c.original_changeset_size; + }); + // Skip over all changesets if they don't need to be transformed and applied. + return changesets_to_integrate.size(); + } + + version_type local_version = transact->get_version_of_current_transaction().version; + auto sync_file_id = transact->get_sync_file_id(); - ChangesetEncoder::Buffer transformed_changeset; try { - for (std::size_t i = 0; i < incoming_changesets.size(); ++i) { - const RemoteChangeset& changeset = incoming_changesets[i]; - REALM_ASSERT(changeset.last_integrated_local_version <= local_version); + for (auto& changeset : changesets_to_integrate) { + REALM_ASSERT(changeset.last_integrated_remote_version <= local_version); REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id); // It is possible that the synchronization history has been trimmed @@ -429,27 +513,27 @@ void ClientHistory::integrate_server_changesets(const SyncProgress& progress, // when this situation occurs. // // See trim_sync_history() for further details. - if (changesets[i].last_integrated_remote_version < m_sync_history_base_version) - changesets[i].last_integrated_remote_version = m_sync_history_base_version; + if (changeset.last_integrated_remote_version < m_sync_history_base_version) + changeset.last_integrated_remote_version = m_sync_history_base_version; } - if (m_replication.apply_server_changes()) { - Transformer& transformer = get_transformer(); // Throws - transformer.transform_remote_changesets(*this, sync_file_id, local_version, changesets, - &logger); // Throws - - // Changesets are applied to the Realm with replication temporarily - // disabled. The main reason for disabling replication and manually adding - // the transformed changesets to the history, is that the replication system - // (due to technical debt) is unable in some cases to produce a correct - // changeset while applying another one (i.e., it cannot carbon copy). - TempShortCircuitReplication tscr{m_replication}; + Transformer& transformer = get_transformer(); // Throws + constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB + + auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool { InstructionApplier applier{*transact}; - for (std::size_t i = 0; i < incoming_changesets.size(); ++i) { - encode_changeset(changesets[i], transformed_changeset); - applier.apply(changesets[i], &logger); // Throws + { + TempShortCircuitReplication tscr{m_replication}; + applier.apply(*transformed_changeset, &logger); // Throws } - } + downloaded_bytes += transformed_changeset->original_changeset_size; + + return !(m_db->other_writers_waiting_for_lock() && transact->get_commit_size() >= commit_byte_size_limit); + }; + auto changesets_transformed_count = + transformer.transform_remote_changesets(*this, sync_file_id, local_version, changesets_to_integrate, + std::move(changeset_applier), &logger); // Throws + return changesets_transformed_count; } catch (const BadChangesetError& e) { throw IntegrationException(ClientError::bad_changeset, @@ -459,52 +543,6 @@ void ClientHistory::integrate_server_changesets(const SyncProgress& progress, throw IntegrationException(ClientError::bad_changeset, util::format("Failed to transform received changeset: %1", e.what())); } - - // downloaded_bytes always contains the total number of downloaded bytes - // from the Realm. downloaded_bytes must be persisted in the Realm, since - // the downloaded changesets are trimmed after use, and since it would be - // expensive to traverse the entire history. - Array& root = m_arrays->root; - auto downloaded_bytes = - std::uint_fast64_t(root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int()); - downloaded_bytes += downloaded_bytes_in_message; - root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws - - // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same - // synthetic server version that represents synthetic changesets generated from state on the server. - if (batch_state == DownloadBatchState::LastInBatch) { - update_sync_progress(progress, downloadable_bytes, transact); // Throws - } - if (run_in_write_tr) { - run_in_write_tr(transact); - } - - // The reason we can use the `origin_timestamp`, and the `origin_file_ident` - // from the last incoming changeset, and ignore all the other changesets, is - // that these values are actually irrelevant for changesets of remote origin - // stored in the client-side history (for now), except that - // `origin_file_ident` is required to be nonzero, to mark it as having been - // received from the server. - const Changeset& last_changeset = changesets.back(); - HistoryEntry entry; - entry.origin_timestamp = last_changeset.origin_timestamp; - entry.origin_file_ident = last_changeset.origin_file_ident; - entry.remote_version = last_changeset.version; - entry.changeset = BinaryData(transformed_changeset.data(), transformed_changeset.size()); - add_sync_history_entry(entry); // Throws - - // Tell prepare_commit()/add_changeset() not to write a history entry for - // this transaction as we already did it. - REALM_ASSERT(!m_applying_server_changeset); - m_applying_server_changeset = true; - auto new_version = transact->commit_and_continue_as_read(); // Throws - - if (transact_reporter) { - transact_reporter->report_sync_transact(old_version, new_version); // Throws - } - - version_info.realm_version = new_version.version; - version_info.sync_version = {new_version.version, 0}; } @@ -908,7 +946,7 @@ bool ClientHistory::no_pending_local_changes(version_type version) const void ClientHistory::do_trim_sync_history(std::size_t n) { - REALM_ASSERT(sync_history_size() == sync_history_size()); + REALM_ASSERT(m_arrays->changesets.size() == sync_history_size()); REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size()); REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size()); REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size()); diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index bc149ab22aa..671332a34a7 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -102,7 +102,7 @@ class ClientHistory final : public _impl::History, public TransformHistory { virtual void report_sync_transact(VersionID old_version, VersionID new_version) = 0; protected: - ~SyncTransactReporter() {} + ~SyncTransactReporter() = default; }; @@ -249,11 +249,12 @@ class ClientHistory final : public _impl::History, public TransformHistory { /// \param transact_reporter An optional callback which will be called with the /// version immediately processing the sync transaction and that of the sync /// transaction. - void integrate_server_changesets(const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, - util::Span changesets, VersionInfo& new_version, - DownloadBatchState download_type, util::Logger&, - util::UniqueFunction run_in_write_tr = nullptr, - SyncTransactReporter* transact_reporter = nullptr); + void + integrate_server_changesets(const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes, + util::Span changesets, VersionInfo& new_version, + DownloadBatchState download_type, util::Logger&, + util::UniqueFunction run_in_write_tr = nullptr, + SyncTransactReporter* transact_reporter = nullptr); static void get_upload_download_bytes(DB*, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&); @@ -415,6 +416,9 @@ class ClientHistory final : public _impl::History, public TransformHistory { std::uint_fast64_t sum_of_history_entry_sizes(version_type begin_version, version_type end_version) const noexcept; + size_t transform_and_apply_server_changesets(util::Span changesets_to_integrate, TransactionRef, + util::Logger&, std::uint64_t& downloaded_bytes); + void prepare_for_write(); Replication::version_type add_changeset(BinaryData changeset, BinaryData sync_changeset); void add_sync_history_entry(const HistoryEntry&); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 5c564be67b7..493e085f604 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -1387,7 +1387,7 @@ void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& { auto& history = repl.get_history(); if (received_changesets.empty()) { - if (download_batch_state != DownloadBatchState::LastInBatch) { + if (download_batch_state == DownloadBatchState::MoreToCome) { throw IntegrationException(ClientError::bad_progress, "received empty download message that was not the last in batch"); } @@ -2081,6 +2081,10 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f DownloadBatchState batch_state, int64_t query_version, const ReceivedChangesets& received_changesets) { + if (is_steady_state_download_message(batch_state, query_version)) { + batch_state = DownloadBatchState::SteadyState; + } + logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, " "latest_server_version=%3, latest_server_version_salt=%4, " "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, " @@ -2088,7 +2092,7 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f progress.download.server_version, progress.download.last_integrated_client_version, progress.latest_server_version.version, progress.latest_server_version.salt, progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes, - batch_state == DownloadBatchState::LastInBatch, query_version, received_changesets.size()); // Throws + batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws // Ignore the message if the deactivation process has been initiated, // because in that case, the associated Realm must not be accessed any @@ -2148,7 +2152,7 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f } } - receive_download_message_hook(progress, query_version, batch_state); + receive_download_message_hook(progress, query_version, batch_state, received_changesets.size()); if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) { clear_resumption_delay_state(); @@ -2157,6 +2161,8 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws + download_message_integrated_hook(progress, query_version, batch_state, received_changesets.size()); + // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect // after a retryable session error. clear_resumption_delay_state(); diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index e0006e1792d..0ea5acee96f 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -1078,7 +1078,10 @@ class ClientImpl::Session { bool check_received_sync_progress(const SyncProgress&, int&) noexcept; void check_for_upload_completion(); void check_for_download_completion(); - void receive_download_message_hook(const SyncProgress&, int64_t, DownloadBatchState); + void receive_download_message_hook(const SyncProgress&, int64_t, DownloadBatchState, size_t); + void download_message_integrated_hook(const SyncProgress&, int64_t, DownloadBatchState, size_t); + + bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version); friend class Connection; }; diff --git a/src/realm/sync/noinst/server/server_history.cpp b/src/realm/sync/noinst/server/server_history.cpp index b7c90675a3a..32196179d5a 100644 --- a/src/realm/sync/noinst/server/server_history.cpp +++ b/src/realm/sync/noinst/server/server_history.cpp @@ -1180,8 +1180,6 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden if (num_changesets > 0) { recip_hist.ensure_instantiated(); // Throws - version_type lowest_last_integrated_local_version = changesets[0].last_integrated_local_version; - // Parse the changesets std::vector parsed_transformed_changesets; parsed_transformed_changesets.resize(num_changesets); @@ -1190,19 +1188,23 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden // Transform the changesets version_type current_server_version = get_server_version(); - bool may_have_causally_unrelated_changes = (current_server_version > lowest_last_integrated_local_version); - if (may_have_causally_unrelated_changes) { - // Merge with causally unrelated changesets, and resolve the - // conflicts if there are any. - TransformHistoryImpl transform_hist{remote_file_ident, *this, recip_hist}; - Transformer& transformer = m_context.get_transformer(); // Throws - transformer.transform_remote_changesets(transform_hist, m_local_file_ident, current_server_version, - parsed_transformed_changesets, &logger); // Throws - } - - // Apply the transformed changesets to the Realm state Group& group = *m_group; Transaction& transaction = dynamic_cast(group); + auto apply = [&](const Changeset* c) -> bool { + TempShortCircuitReplication tdr{*this}; // Short-circuit while integrating changes + InstructionApplier applier{transaction}; + applier.apply(*c, &logger); + reset(); // Reset the instruction encoder + return true; + }; + // Merge with causally unrelated changesets, and resolve the + // conflicts if there are any. + TransformHistoryImpl transform_hist{remote_file_ident, *this, recip_hist}; + Transformer& transformer = m_context.get_transformer(); // Throws + transformer.transform_remote_changesets(transform_hist, m_local_file_ident, current_server_version, + parsed_transformed_changesets, std::move(apply), + &logger); // Throws + for (std::size_t i = 0; i < num_changesets; ++i) { REALM_ASSERT(get_instruction_encoder().buffer().size() == 0); const Changeset& changeset = parsed_transformed_changesets[i]; @@ -1214,14 +1216,10 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden ChangesetEncoder::Buffer changeset_buffer; - TempShortCircuitReplication tdr{*this}; // Short-circuit while integrating changes - InstructionApplier applier{transaction}; - applier.apply(parsed_transformed_changesets[i], &logger); // Throws encode_changeset(parsed_transformed_changesets[i], changeset_buffer); // Throws entry.changeset = BinaryData{changeset_buffer.data(), changeset_buffer.size()}; add_sync_history_entry(entry); // Throws - reset(); // Reset the instruction encoder } } diff --git a/src/realm/sync/protocol.hpp b/src/realm/sync/protocol.hpp index e28d9f21e3c..4b8f772604a 100644 --- a/src/realm/sync/protocol.hpp +++ b/src/realm/sync/protocol.hpp @@ -134,6 +134,7 @@ struct DownloadCursor { enum class DownloadBatchState { MoreToCome, LastInBatch, + SteadyState, }; /// Checks that `dc.last_integrated_client_version` is zero if diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index d9b3e168266..5a17f1c665c 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -1359,12 +1359,12 @@ struct Merge parsed_changesets, util::Logger* logger) +size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, + version_type current_local_version, + util::Span parsed_changesets, + util::UniqueFunction changeset_applier, + util::Logger* logger) { REALM_ASSERT(local_file_ident != 0); std::vector our_changesets; + // p points to the beginning of a range of changesets that share the same + // "base", i.e. are based on the same local version. + auto p = parsed_changesets.begin(); + auto parsed_changesets_end = parsed_changesets.end(); + try { - // p points to the beginning of a range of changesets that share the same - // "base", i.e. are based on the same local version. - auto p = parsed_changesets.begin(); - auto parsed_changesets_end = parsed_changesets.end(); while (p != parsed_changesets_end) { // Find the range of incoming changesets that share the same // last_integrated_local_version, which means we can merge them in one go. @@ -2554,12 +2557,29 @@ void TransformerImpl::transform_remote_changesets(TransformHistory& history, fil begin_version = version; } + bool must_apply_all = false; + if (!our_changesets.empty()) { merge_changesets(local_file_ident, &*p, same_base_range_end - p, our_changesets.data(), our_changesets.size(), logger); // Throws + // We need to apply all transformed changesets if at least one reciprocal changeset was modified + // during OT. + must_apply_all = std::any_of(our_changesets.begin(), our_changesets.end(), [](const Changeset* c) { + return c->is_dirty(); + }); + } + + auto continue_applying = true; + for (; p != same_base_range_end && continue_applying; ++p) { + // It is safe to stop applying the changesets if: + // 1. There are no reciprocal changesets + // 2. No reciprocal changeset was modified + continue_applying = changeset_applier(p) || must_apply_all; + } + if (!continue_applying) { + break; } - p = same_base_range_end; our_changesets.clear(); // deliberately not releasing memory } } @@ -2578,6 +2598,8 @@ void TransformerImpl::transform_remote_changesets(TransformHistory& history, fil // NOTE: Any exception thrown during flushing *MUST* lead to rollback of // the current transaction. flush_reciprocal_transform_cache(history); // Throws + + return p - parsed_changesets.begin(); } @@ -2643,16 +2665,13 @@ void parse_remote_changeset(const Transformer::RemoteChangeset& remote_changeset REALM_ASSERT(remote_changeset.remote_version != 0); ChunkedBinaryInputStream remote_in{remote_changeset.data}; - try { - parse_changeset(remote_in, parsed_changeset); // Throws - } - catch (sync::BadChangesetError& e) { - throw TransformError(e.what()); - } + parse_changeset(remote_in, parsed_changeset); // Throws + parsed_changeset.version = remote_changeset.remote_version; parsed_changeset.last_integrated_remote_version = remote_changeset.last_integrated_local_version; parsed_changeset.origin_timestamp = remote_changeset.origin_timestamp; parsed_changeset.origin_file_ident = remote_changeset.origin_file_ident; + parsed_changeset.original_changeset_size = remote_changeset.original_changeset_size; } } // namespace sync diff --git a/src/realm/sync/transform.hpp b/src/realm/sync/transform.hpp index 9749bad3d74..96789b2794a 100644 --- a/src/realm/sync/transform.hpp +++ b/src/realm/sync/transform.hpp @@ -127,6 +127,8 @@ class Transformer { public: class RemoteChangeset; + using iterator = util::Span::iterator; + /// Produce operationally transformed versions of the specified changesets, /// which are assumed to be received from a particular remote peer, P, /// represented by the specified transform history. Note that P is not @@ -136,7 +138,7 @@ class Transformer { /// changesets and all causally unrelated changesets in the local history. A /// changeset in the local history is causally unrelated if, and only if it /// occurs after the local changeset that produced - /// `remote_changeset.last_integrated_local_version` and is not a produced + /// `remote_changeset.last_integrated_local_version` and it is not produced /// by integration of a changeset received from P. This assumes that /// `remote_changeset.last_integrated_local_version` is set to the local /// version produced by the last local changeset, that was integrated by P @@ -164,14 +166,20 @@ class Transformer { /// changeset is of local origin. The specified identifier must never be /// zero. /// + /// \param changeset_applier Called to to apply each transformed changeset. + /// Returns true if it can continue applying the changests, false otherwise. + /// + /// \return The number of changesets that have been transformed and applied. + /// /// \throw TransformError Thrown if operational transformation fails due to /// a problem with the specified changeset. /// /// FIXME: Consider using std::error_code instead of throwing /// TransformError. - virtual void transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, - version_type current_local_version, util::Span changesets, - util::Logger* = nullptr) = 0; + virtual size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, + version_type current_local_version, util::Span changesets, + util::UniqueFunction changeset_applier, + util::Logger* = nullptr) = 0; virtual ~Transformer() noexcept {} }; @@ -194,8 +202,8 @@ class TransformerImpl : public sync::Transformer { TransformerImpl(); - void transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, - util::Logger*) override; + size_t transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, + util::UniqueFunction, util::Logger*) override; struct Side; struct MajorSide; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 948d4e9ea6f..8337cba1132 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -10,6 +10,7 @@ endif() add_subdirectory(benchmark-common-tasks) add_subdirectory(benchmark-crud) add_subdirectory(benchmark-larger) +add_subdirectory(benchmark-sync) # FIXME: Add other benchmarks set(CORE_TEST_SOURCES @@ -215,11 +216,6 @@ if(REALM_ENABLE_SYNC) client/statistics.hpp ) - set(BENCH_TRANSFORM_SOURCES - bench-sync/bench_transform.cpp - test_all.cpp - ) - file(GLOB TEST_RESOURCES RELATIVE ${CMAKE_CURRENT_BINARY_DIR} *.json *.pem ../certificate-authority/certs/* @@ -244,8 +240,4 @@ if(REALM_ENABLE_SYNC) # This enables symbols in backtraces target_link_libraries(SyncTests "-rdynamic") endif() - - add_executable(BenchTransform EXCLUDE_FROM_ALL ${BENCH_TRANSFORM_SOURCES}) - set_target_properties(BenchTransform PROPERTIES OUTPUT_NAME "bench-transform") - target_link_libraries(BenchTransform TestUtil Sync) endif() diff --git a/test/bench-sync/bench_transform.cpp b/test/bench-sync/bench_transform.cpp deleted file mode 100644 index eee55255f0a..00000000000 --- a/test/bench-sync/bench_transform.cpp +++ /dev/null @@ -1,454 +0,0 @@ -#include "../util/benchmark_results.hpp" -#include "../util/dump_changesets.hpp" -#include "../util/timer.hpp" -#include "../util/test_path.hpp" -#include "../util/unit_test.hpp" -#include "../test_all.hpp" - -#if REALM_ENABLE_ENCRYPTION -#include "../util/crypt_key.hpp" -#endif // REALM_ENABLE_ENCRYPTION - -#include -#include - -#include "../peer.hpp" - -using namespace realm; -using namespace realm::sync; -using namespace realm::test_util; -using namespace realm::test_util::unit_test; - -static constexpr auto s_bench_test_dump_dir = "BENCHTEST_DUMP_TRANSFORM"; - -namespace bench { - -// Two peers have 1000 transactions each with a handful of instructions in -// each (25% transactions contain MoveLastOver). One peer receives and merges -// all transactions from the other (but does not apply them to their -// database). -template -void transform_transactions(TestContext& test_context, BenchmarkResults& results) -{ - std::string ident = test_context.test_details.test_name; - std::string ident_preface = test_context.test_details.test_name + "_Setup"; - const size_t num_iterations = 3; - - for (size_t i = 0; i < num_iterations; ++i) { - // We dump the changesets generated by the performance tests when a directory is specified. - // This generates a performance testing corpus for the Golang implementation. - auto changeset_dump_dir_gen = get_changeset_dump_dir_generator(test_context, s_bench_test_dump_dir); - - auto server = Peer::create_server(test_context, changeset_dump_dir_gen.get()); - auto origin = Peer::create_client(test_context, 2, changeset_dump_dir_gen.get()); - auto client = Peer::create_client(test_context, 3, changeset_dump_dir_gen.get()); - - // Produce some mostly realistic transactions on both sides. - auto make_transactions = [](Peer& peer) { - ColKey col_ndx; - { - peer.start_transaction(); - TableRef t = sync::create_table(*peer.group, "class_t"); - col_ndx = t->add_column(type_Int, "i"); - peer.commit(); - } - - for (size_t j = 0; j < num_transactions - 1; ++j) { - peer.start_transaction(); - TableRef t = peer.table("class_t"); - t->create_object().set(col_ndx, 123); - - // Let 25% of commits contain a MoveLastOver - if (j % 4 == 0) { - t->remove_object(t->begin()); - } - peer.commit(); - } - }; - - // Timer t_preface{Timer::type_RealTime}; - make_transactions(*origin); - make_transactions(*client); - // results.submit(ident_preface.c_str(), t_preface.get_elapsed_time()); - - // Upload everything to the server (fast, no conflicts) - size_t outstanding = server->count_outstanding_changesets_from(*origin); - for (size_t j = 0; j < outstanding; ++j) { - server->integrate_next_changeset_from(*origin); - } - - outstanding = client->count_outstanding_changesets_from(*server); - REALM_ASSERT(outstanding != 0); - Timer t{Timer::type_RealTime}; - // FIXME: client to server is artificially slow, because we do not yet - // have a batched UPLOAD ability. - // - // for (size_t j = 0; j < outstanding; ++j) { - // server->integrate_next_changeset_from(*client); - // } - client->integrate_next_changesets_from(*server, outstanding); - results.submit(ident.c_str(), t.get_elapsed_time()); - } - - // results.finish(ident_preface, ident_preface); - results.finish(ident, ident); -} - -// Two peers have 1 transaction each with 1000 instructions (8.3% of -// instructions are MoveLastOver). One peer receives and merges the large -// transaction from the other (but does not apply it to their database). -template -void transform_instructions(TestContext& test_context, BenchmarkResults& results) -{ - std::string ident = test_context.test_details.test_name; - std::string ident_preface = test_context.test_details.test_name + "_Setup"; - - for (size_t i = 0; i < 3; ++i) { - // We dump the changesets generated by the performance tests when a directory is specified. - // This generates a performance testing corpus for the Golang implementation. - auto changeset_dump_dir_gen = get_changeset_dump_dir_generator(test_context, s_bench_test_dump_dir); - - auto server = Peer::create_server(test_context, changeset_dump_dir_gen.get()); - auto client = Peer::create_client(test_context, 2, changeset_dump_dir_gen.get()); - - // Produce some mostly realistic transactions on both sides. - auto make_instructions = [](Peer& peer) { - peer.start_transaction(); - TableRef t = peer.group->add_table("class_t"); - ColKey col_ndx = t->add_column(type_Int, "i"); - - for (size_t j = 0; j < num_iterations; ++j) { - Obj obj = t->create_object(); - obj.set(col_ndx, 123); - - // Let 25% of commits contain a MoveLastOver - if (j % 4 == 0) { - t->begin()->remove(); - } - } - - return peer.commit(); - }; - - // Timer t_preface{Timer::type_RealTime}; - make_instructions(*server); - make_instructions(*client); - // results.submit(ident_preface.c_str(), t_preface.get_elapsed_time()); - - size_t outstanding = server->count_outstanding_changesets_from(*client); - REALM_ASSERT(outstanding != 0); - Timer t{Timer::type_RealTime}; - for (size_t j = 0; j < outstanding; ++j) { - server->integrate_next_changeset_from(*client); - } - results.submit(ident.c_str(), t.get_elapsed_time()); - } - - // results.finish(ident_preface, ident_preface); - results.finish(ident, ident); -} - -template -void connected_objects(TestContext& test_context, BenchmarkResults& results) -{ - std::string ident = test_context.test_details.test_name; - std::string ident_preface = test_context.test_details.test_name + "_Setup"; - - for (size_t i = 0; i < 3; ++i) { - // We dump the changesets generated by the performance tests when a directory is specified. - // This generates a performance testing corpus for the Golang implementation. - auto changeset_dump_dir_gen = get_changeset_dump_dir_generator(test_context, s_bench_test_dump_dir); - - auto server = Peer::create_server(test_context, changeset_dump_dir_gen.get()); - auto client = Peer::create_client(test_context, 2, changeset_dump_dir_gen.get()); - - auto make_instructions = [](Peer& peer) { - peer.start_transaction(); - TableRef t = sync::create_table_with_primary_key(*peer.group, "class_t", type_String, "pk"); - auto col_key = t->add_column(*t, "l"); - - // Everything links to this object! - auto first_key = t->create_object_with_primary_key("Hello").get_key(); - - for (size_t j = 0; j < num_iterations; ++j) { - std::stringstream ss; - ss << j; - std::string pk = ss.str(); - t->create_object_with_primary_key(pk).set(col_key, first_key); - } - - return peer.commit(); - }; - - // Timer t_preface{Time::type_RealTime}; - make_instructions(*server); - make_instructions(*client); - // results.submit(ident_preface.c_str(), t_preface.get_elapsed_time()); - - size_t outstanding = server->count_outstanding_changesets_from(*client); - REALM_ASSERT(outstanding != 0); - Timer t{Timer::type_RealTime}; - for (size_t j = 0; j < outstanding; ++j) { - server->integrate_next_changeset_from(*client); - } - results.submit(ident.c_str(), t.get_elapsed_time()); - } - - results.finish(ident, ident); -} - -} // namespace bench - -const int max_lead_text_width = 40; - -#define RUN_ALL_BENCHMARKS 0 - -TEST(BenchMerge1000x1000Instructions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_1000x1000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<1000>(test_context, results); -} - -TEST(BenchMerge2000x2000Instructions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_2000x2000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<2000>(test_context, results); -} - -TEST_IF(BenchMerge3000x3000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_3000x3000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<3000>(test_context, results); -} - -TEST(BenchMerge4000x4000Instructions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_4000x4000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<4000>(test_context, results); -} - -TEST_IF(BenchMerge5000x5000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_5000x5000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<5000>(test_context, results); -} - -TEST(BenchMerge8000x8000Instructions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_8000x8000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<8000>(test_context, results); -} - -TEST_IF(BenchMerge10000x10000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_10000x10000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<10000>(test_context, results); -} - -TEST_IF(BenchMerge11000x11000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_11000x11000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<11000>(test_context, results); -} - -TEST_IF(BenchMerge12000x12000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_12000x12000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<12000>(test_context, results); -} - -TEST_IF(BenchMerge13000x13000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_13000x13000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<13000>(test_context, results); -} - -TEST_IF(BenchMerge14000x14000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_14000x14000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<14000>(test_context, results); -} - -TEST_IF(BenchMerge15000x15000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_15000x15000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<15000>(test_context, results); -} - -TEST(BenchMerge16000x16000Instructions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_16000x16000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<16000>(test_context, results); -} - -TEST_IF(BenchMerge17000x17000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_17000x17000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<17000>(test_context, results); -} - -TEST_IF(BenchMerge18000x18000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_18000x18000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<18000>(test_context, results); -} - -TEST_IF(BenchMerge19000x19000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_19000x19000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<19000>(test_context, results); -} - -TEST_IF(BenchMerge20000x20000Instructions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "instructions_20000x20000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_instructions<20000>(test_context, results); -} - - -TEST(BenchMerge100x100Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_100x100"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<100>(test_context, results); -} - -TEST(BenchMerge500x500Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_500x500"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<500>(test_context, results); -} - -TEST(BenchMerge1000x1000Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_1000x1000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<1000>(test_context, results); -} - -TEST(BenchMerge2000x2000Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_2000x2000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<2000>(test_context, results); -} - -TEST_IF(BenchMerge3000x3000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_3000x3000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<3000>(test_context, results); -} - - -TEST(BenchMerge4000x4000Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_4000x4000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<4000>(test_context, results); -} - -TEST_IF(BenchMerge5000x5000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_5000x5000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<5000>(test_context, results); -} - -TEST_IF(BenchMerge6000x6000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_6000x6000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<6000>(test_context, results); -} - -TEST_IF(BenchMerge7000x7000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_7000x7000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<7000>(test_context, results); -} - - -TEST(BenchMerge8000x8000Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_8000x8000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<8000>(test_context, results); -} - -TEST_IF(BenchMerge9000x9000Transactions, RUN_ALL_BENCHMARKS) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_9000x9000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<9000>(test_context, results); -} - -TEST(BenchMerge16000x16000Transactions) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "transactions_16000x16000"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::transform_transactions<16000>(test_context, results); -} - -TEST(BenchMergeManyConnectedObjects) -{ - std::string results_file_stem = test_util::get_test_path_prefix() + "connected_objects"; - BenchmarkResults results(max_lead_text_width, results_file_stem.c_str()); - - bench::connected_objects<8000>(test_context, results); -} - -#if !REALM_IOS -int main(int argc, char** argv) -{ - return test_all(argc, argv, nullptr); -} -#endif // REALM_IOS diff --git a/test/benchmark-sync/CMakeLists.txt b/test/benchmark-sync/CMakeLists.txt new file mode 100644 index 00000000000..089222aea27 --- /dev/null +++ b/test/benchmark-sync/CMakeLists.txt @@ -0,0 +1,5 @@ +if(REALM_ENABLE_SYNC) + add_executable(realm-benchmark-sync bench_transform.cpp ../test_all.cpp) + add_dependencies(benchmarks realm-benchmark-sync) + target_link_libraries(realm-benchmark-sync TestUtil Sync SyncServer) +endif() \ No newline at end of file diff --git a/test/bench-sync/access_token b/test/benchmark-sync/access_token similarity index 100% rename from test/bench-sync/access_token rename to test/benchmark-sync/access_token diff --git a/test/benchmark-sync/bench_transform.cpp b/test/benchmark-sync/bench_transform.cpp new file mode 100644 index 00000000000..ad4f533b7e6 --- /dev/null +++ b/test/benchmark-sync/bench_transform.cpp @@ -0,0 +1,332 @@ +#include "../util/benchmark_results.hpp" +#include "../util/timer.hpp" +#include "../util/test_path.hpp" +#include "../util/unit_test.hpp" +#include "../test_all.hpp" +#include "../sync_fixtures.hpp" + +using namespace realm; +using namespace realm::test_util::unit_test; +using namespace realm::fixtures; + +namespace bench { + +static std::unique_ptr results; + +#define TEST_CLIENT_DB(name) \ + SHARED_GROUP_TEST_PATH(name##_path); \ + auto name = DB::create(make_client_replication(), name##_path); + +// Two peers have 1000 transactions each with a handful of instructions in +// each (25% transactions contain MoveLastOver). One peer receives and merges +// all transactions from the other (but does not apply them to their +// database). +template +void transform_transactions(TestContext& test_context) +{ + std::string ident = test_context.test_details.test_name; + const size_t num_iterations = 3; + + for (size_t i = 0; i < num_iterations; ++i) { + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + // Produce some mostly realistic transactions on both sides. + auto make_transactions = [](DBRef& db) { + ColKey col_ndx; + { + WriteTransaction wt(db); + TableRef t = wt.add_table("class_t"); + col_ndx = t->add_column(type_String, "s"); + wt.commit(); + } + + for (size_t j = 0; j < num_transactions - 1; ++j) { + WriteTransaction wt(db); + TableRef t = wt.get_table("class_t"); + t->create_object().set(col_ndx, std::string(500, char('a' + j % 26))); + + // Let 25% of commits contain a MoveLastOver + if (j % 4 == 0) { + t->remove_object(t->begin()); + } + wt.commit(); + } + }; + + make_transactions(db_1); + make_transactions(db_2); + + TEST_DIR(dir); + + MultiClientServerFixture::Config config; + config.server_public_key_path = ""; + MultiClientServerFixture fixture(2, 1, dir, test_context, config); + Timer t{Timer::type_RealTime}; + + Session::Config session_config; + session_config.on_download_message_received_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + t.reset(); + }; + session_config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + results->submit(ident.c_str(), t.get_elapsed_time()); + }; + + Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + fixture.stop_client(1); + + // Upload changes of first client and wait to integrate changes from second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); + } + + results->finish(ident, ident, "runtime_secs"); +} + +// Two peers have 1 transaction each with 1000 instructions (8.3% of +// instructions are MoveLastOver). One peer receives and merges the large +// transaction from the other (but does not apply it to their database). +template +void transform_instructions(TestContext& test_context) +{ + std::string ident = test_context.test_details.test_name; + + for (size_t i = 0; i < 3; ++i) { + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + // Produce some mostly realistic transactions on both sides. + auto make_instructions = [](DBRef& db) { + WriteTransaction wt(db); + TableRef t = wt.add_table("class_t"); + ColKey col_ndx = t->add_column(type_Int, "i"); + + for (size_t j = 0; j < num_iterations; ++j) { + t->create_object().set(col_ndx, 123); + + // Let 25% of commits contain a MoveLastOver + if (j % 4 == 0) { + t->begin()->remove(); + } + } + wt.commit(); + }; + + make_instructions(db_1); + make_instructions(db_2); + + TEST_DIR(dir); + + MultiClientServerFixture::Config config; + config.server_public_key_path = ""; + MultiClientServerFixture fixture(2, 1, dir, test_context, config); + Timer t{Timer::type_RealTime}; + + Session::Config session_config; + session_config.on_download_message_received_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + t.reset(); + }; + session_config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + results->submit(ident.c_str(), t.get_elapsed_time()); + }; + Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + fixture.stop_client(1); + + // Upload changes of first client and wait to integrate changes from second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); + } + + results->finish(ident, ident, "runtime_secs"); +} + +template +void connected_objects(TestContext& test_context) +{ + std::string ident = test_context.test_details.test_name; + + for (size_t i = 0; i < 3; ++i) { + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + // Produce some mostly realistic transactions on both sides. + auto make_instructions = [](DBRef& db) { + WriteTransaction wt(db); + TableRef t = wt.get_group().add_table_with_primary_key("class_t", type_String, "pk"); + ColKey col_key = t->add_column(*t, "l"); + + // Everything links to this object! + auto first_key = t->create_object_with_primary_key("Hello").get_key(); + + for (size_t j = 0; j < num_iterations; ++j) { + std::stringstream ss; + ss << j; + std::string pk = ss.str(); + t->create_object_with_primary_key(pk).set(col_key, first_key); + } + wt.commit(); + }; + + make_instructions(db_1); + make_instructions(db_2); + + TEST_DIR(dir); + + MultiClientServerFixture::Config config; + config.server_public_key_path = ""; + MultiClientServerFixture fixture(2, 1, dir, test_context, config); + Timer t{Timer::type_RealTime}; + + Session::Config session_config; + session_config.on_download_message_received_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + t.reset(); + }; + session_config.on_download_message_integrated_hook = + [&](const sync::SyncProgress&, int64_t, sync::DownloadBatchState batch_state, size_t num_changesets) { + CHECK(batch_state == sync::DownloadBatchState::SteadyState); + if (num_changesets == 0) + return; + results->submit(ident.c_str(), t.get_elapsed_time()); + }; + Session session_1 = fixture.make_session(0, db_1, std::move(session_config)); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of second client. + fixture.start_server(0); + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + fixture.stop_client(1); + + // Upload changes of first client and wait to integrate changes from second client. + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); + } + + results->finish(ident, ident, "runtime_secs"); +} + +} // namespace bench + +const int max_lead_text_width = 40; + +TEST(BenchMerge1000x1000Instructions) +{ + bench::transform_instructions<1000>(test_context); +} + +TEST(BenchMerge2000x2000Instructions) +{ + bench::transform_instructions<2000>(test_context); +} + +TEST(BenchMerge4000x4000Instructions) +{ + bench::transform_instructions<4000>(test_context); +} + +TEST(BenchMerge8000x8000Instructions) +{ + bench::transform_instructions<8000>(test_context); +} + +TEST(BenchMerge16000x16000Instructions) +{ + bench::transform_instructions<16000>(test_context); +} + +TEST(BenchMerge100x100Transactions) +{ + bench::transform_transactions<100>(test_context); +} + +TEST(BenchMerge500x500Transactions) +{ + bench::transform_transactions<500>(test_context); +} + +TEST(BenchMerge1000x1000Transactions) +{ + bench::transform_transactions<1000>(test_context); +} + +TEST(BenchMerge2000x2000Transactions) +{ + bench::transform_transactions<2000>(test_context); +} + +TEST(BenchMerge4000x4000Transactions) +{ + bench::transform_transactions<4000>(test_context); +} + +TEST(BenchMerge8000x8000Transactions) +{ + bench::transform_transactions<8000>(test_context); +} + +TEST(BenchMerge16000x16000Transactions) +{ + bench::transform_transactions<16000>(test_context); +} + +TEST(BenchMergeManyConnectedObjects) +{ + bench::connected_objects<1000>(test_context); +} + +#if !REALM_IOS +int main() +{ + std::string results_file_stem = realm::test_util::get_test_path_prefix() + "results"; + bench::results = + std::make_unique(max_lead_text_width, "benchmark-sync", results_file_stem.c_str()); + auto exit_status = test_all(); + // Save to file when deallocated. + bench::results.reset(); + return exit_status; +} +#endif // REALM_IOS diff --git a/test/bench-sync/load_test.cpp b/test/benchmark-sync/load_test.cpp similarity index 100% rename from test/bench-sync/load_test.cpp rename to test/benchmark-sync/load_test.cpp diff --git a/test/bench-sync/load_test_clients_listen_start.sh b/test/benchmark-sync/load_test_clients_listen_start.sh similarity index 100% rename from test/bench-sync/load_test_clients_listen_start.sh rename to test/benchmark-sync/load_test_clients_listen_start.sh diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 3a04d5f9fa8..1b3a9ac2f28 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1186,12 +1186,10 @@ TEST_CASE("SharedRealm: async writes") { auto write = db->start_write(); sema.add_stone(); - // We want to wait until the main thread is waiting for the - // lock, which we can't do deterministically. If this sleep - // is too short the test will still pass and it'll just fail - // to test the intended code path. - std::chrono::milliseconds wait_time{500}; - std::this_thread::sleep_for(wait_time); + // Wait until the main thread is waiting for the lock. + while (!db->other_writers_waiting_for_lock()) { + millisleep(1); + } write->close(); }); diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index f11c05deb74..5bb9fdb78d8 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -1037,7 +1037,8 @@ TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][f config.sync_config->on_download_message_received_hook = [promise = std::move(shared_promise)]( std::weak_ptr weak_session, const sync::SyncProgress&, int64_t query_version, - sync::DownloadBatchState batch_state) mutable { + sync::DownloadBatchState batch_state, + size_t) mutable { auto session = weak_session.lock(); if (!session) { return; @@ -1693,13 +1694,13 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app] interrupted_realm_config.sync_config->on_download_message_received_hook = [&, promise = std::move(shared_saw_valid_state_promise)](std::weak_ptr weak_session, const sync::SyncProgress&, int64_t query_version, - sync::DownloadBatchState batch_state) { + sync::DownloadBatchState batch_state, size_t) { auto session = weak_session.lock(); if (!session) { return; } - if (query_version != 1 || batch_state != sync::DownloadBatchState::LastInBatch) { + if (query_version != 1 || batch_state == sync::DownloadBatchState::MoreToCome) { return; } diff --git a/test/peer.hpp b/test/peer.hpp index 2ca4e3d8c9f..19149bbca80 100644 --- a/test/peer.hpp +++ b/test/peer.hpp @@ -486,16 +486,20 @@ inline auto ShortCircuitHistory::integrate_remote_changesets(file_ident_type rem } TransformHistoryImpl transform_hist{*this, remote_file_ident}; - m_transformer->transform_remote_changesets(transform_hist, m_local_file_ident, local_version, changesets, logger); + auto apply = [&](const Changeset* c) -> bool { + sync::InstructionApplier applier{*transact}; + applier.apply(*c, logger); - sync::ChangesetEncoder::Buffer assembled_transformed_changeset; + return true; + }; + m_transformer->transform_remote_changesets(transform_hist, m_local_file_ident, local_version, changesets, + std::move(apply), logger); - for (size_t i = 0; i < num_changesets; ++i) { - sync::InstructionApplier applier{*transact}; - applier.apply(changesets[i], logger); + transact->verify(); - transact->verify(); + sync::ChangesetEncoder::Buffer assembled_transformed_changeset; + for (size_t i = 0; i < num_changesets; ++i) { sync::encode_changeset(changesets[i], assembled_transformed_changeset); } diff --git a/test/sync_fixtures.hpp b/test/sync_fixtures.hpp index ff148edf7cb..a8eab11e9cc 100644 --- a/test/sync_fixtures.hpp +++ b/test/sync_fixtures.hpp @@ -617,6 +617,42 @@ class MultiClientServerFixture { }); } + // Use either the methods below or `start()`. + void start_server(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_servers); + m_server_threads[index].start([this, index] { + run_server(index); + }); + } + + void start_client(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_clients); + m_client_threads[index].start([this, index] { + run_client(index); + }); + } + + void stop_server(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_servers); + m_servers[index]->stop(); + unit_test::TestContext& test_context = m_test_context; + if (m_server_threads[index].joinable()) + CHECK(!m_server_threads[index].join()); + CHECK_LESS_EQUAL(m_servers[index]->errors_seen(), m_allow_server_errors[index]); + } + + void stop_client(int index) + { + REALM_ASSERT(index >= 0 && index < m_num_clients); + m_clients[index]->stop(); + unit_test::TestContext& test_context = m_test_context; + if (m_client_threads[index].joinable()) + CHECK(!m_client_threads[index].join()); + } + void stop() { for (int i = 0; i < m_num_clients; ++i) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index e6e9ebbabd0..a3853de2a3b 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -4304,6 +4304,88 @@ TEST(Sync_MergeLargeChangesets) CHECK_EQUAL(table->size(), 2 * number_of_rows); } + +TEST(Sync_MergeMultipleChangesets) +{ + constexpr int number_of_changesets = 100; + constexpr int number_of_instructions = 10; + + TEST_CLIENT_DB(db_1); + TEST_CLIENT_DB(db_2); + + { + WriteTransaction wt(db_1); + TableRef table = wt.add_table("class_table name"); + table->add_column(type_Int, "integer column"); + wt.commit(); + } + + { + WriteTransaction wt(db_2); + TableRef table = wt.add_table("class_table name"); + table->add_column(type_Int, "integer column"); + wt.commit(); + } + + { + for (int i = 0; i < number_of_changesets; ++i) { + WriteTransaction wt(db_1); + TableRef table = wt.get_table("class_table name"); + for (int j = 0; j < number_of_instructions; ++j) { + auto obj = table->create_object(); + obj.set("integer column", 2 * j); + } + wt.commit(); + } + } + + { + for (int i = 0; i < number_of_changesets; ++i) { + WriteTransaction wt(db_2); + TableRef table = wt.get_table("class_table name"); + for (int j = 0; j < number_of_instructions; ++j) { + auto obj = table->create_object(); + obj.set("integer column", 2 * j + 1); + } + wt.commit(); + } + } + + { + TEST_DIR(dir); + MultiClientServerFixture fixture(2, 1, dir, test_context); + + Session session_1 = fixture.make_session(0, db_1); + fixture.bind_session(session_1, 0, "/test"); + Session session_2 = fixture.make_session(1, db_2); + fixture.bind_session(session_2, 0, "/test"); + + // Start server and upload changes of first client. + fixture.start_server(0); + fixture.start_client(0); + session_1.wait_for_upload_complete_or_client_stopped(); + session_1.wait_for_download_complete_or_client_stopped(); + // Stop first client. + fixture.stop_client(0); + + // Start the second client and upload their changes. + // Wait to integrate changes from the first client. + fixture.start_client(1); + session_2.wait_for_upload_complete_or_client_stopped(); + session_2.wait_for_download_complete_or_client_stopped(); + } + + ReadTransaction read_1(db_1); + ReadTransaction read_2(db_2); + const Group& group1 = read_1; + const Group& group2 = read_2; + ConstTableRef table1 = group1.get_table("class_table name"); + ConstTableRef table2 = group2.get_table("class_table name"); + CHECK_EQUAL(table1->size(), number_of_changesets * number_of_instructions); + CHECK_EQUAL(table2->size(), 2 * number_of_changesets * number_of_instructions); +} + + #endif // REALM_PLATFORM_WIN32 @@ -6610,7 +6692,7 @@ TEST(Sync_NonIncreasingServerVersions) VersionInfo version_info; util::StderrLogger logger; history.integrate_server_changesets(progress, &downloadable_bytes, server_changesets_encoded, version_info, - DownloadBatchState::LastInBatch, logger); + DownloadBatchState::SteadyState, logger); } TEST(Sync_InvalidChangesetFromServer)