diff --git a/CHANGELOG.md b/CHANGELOG.md index dccc15b29ca..7ef9a963368 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,7 @@ * None. ### Fixed -* ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* A non-streaming progress notifier would not immediately call its callback after registration. Instead you would have to wait for a download message to be received to get your first update - if you were already caught up when you registered the notifier you could end up waiting a long time for the server to deliver a download that would call/expire your notifier ([#7627](https://github.com/realm/realm-core/issues/7627), since v14.6.0). ### Breaking changes * None. diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 697a1cad122..9ecafa81693 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -821,10 +821,10 @@ void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status erro void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, uint64_t snapshot_version, double download_estimate, - double upload_estimate) + double upload_estimate, int64_t query_version) { m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate, - upload_estimate); + upload_estimate, query_version); } static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config, @@ -962,10 +962,10 @@ void SyncSession::create_sync_session() m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, uint_fast64_t uploadable, uint_fast64_t snapshot_version, double download_estimate, - double upload_estimate) { + double upload_estimate, int64_t query_version) { if (auto self = weak_self.lock()) { self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version, - download_estimate, upload_estimate); + download_estimate, upload_estimate, query_version); } }); @@ -1267,7 +1267,11 @@ void SyncSession::wait_for_download_completion(util::UniqueFunction&& notifier, ProgressDirection direction, bool is_streaming) { - return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming); + int64_t pending_query_version = 0; + if (auto sub_store = get_flx_subscription_store()) { + pending_query_version = sub_store->get_version_info().latest; + } + return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming, pending_query_version); } void SyncSession::unregister_progress_notifier(uint64_t token) @@ -1519,7 +1523,8 @@ void SyncSession::did_drop_external_reference() } uint64_t SyncProgressNotifier::register_callback(std::function notifier, - NotifierType direction, bool is_streaming) + NotifierType direction, bool is_streaming, + int64_t pending_query_version) { util::UniqueFunction invocation; uint64_t token_value = 0; @@ -1527,14 +1532,14 @@ uint64_t SyncProgressNotifier::register_callback(std::function lock(m_mutex); token_value = m_progress_notifier_token++; NotifierPackage package{std::move(notifier), m_local_transaction_version, is_streaming, - direction == NotifierType::download}; + direction == NotifierType::download, pending_query_version}; if (!m_current_progress) { // Simply register the package, since we have no data yet. m_packages.emplace(token_value, std::move(package)); return token_value; } bool skip_registration = false; - invocation = package.create_invocation(*m_current_progress, skip_registration, true); + invocation = package.create_invocation(*m_current_progress, skip_registration); if (skip_registration) { token_value = 0; } @@ -1553,13 +1558,14 @@ void SyncProgressNotifier::unregister_callback(uint64_t token) } void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, - uint64_t snapshot_version, double download_estimate, double upload_estimate) + uint64_t snapshot_version, double download_estimate, double upload_estimate, + int64_t query_version) { std::vector> invocations; { std::lock_guard lock(m_mutex); - m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, - upload_estimate, download_estimate, snapshot_version}; + m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, + upload_estimate, download_estimate, snapshot_version, query_version}; for (auto it = m_packages.begin(); it != m_packages.end();) { bool should_delete = false; @@ -1579,49 +1585,50 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version) } util::UniqueFunction -SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired, - bool initial_registration) +SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired) { - uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded; + uint64_t transfered = is_download ? current_progress.downloaded : current_progress.uploaded; uint64_t transferable = is_download ? current_progress.downloadable : current_progress.uploadable; - double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate; - - // If the sync client has not yet processed all of the local - // transactions then the uploadable data is incorrect and we should - // not invoke the callback - if (!is_download && snapshot_version > current_progress.snapshot_version) - return [] {}; - - // for download only invoke the callback on registration if is in active data transfer, - // otherwise delay notifying until an update with the new transfer signaled - if (is_download && !started_notifying && progress_estimate >= 1) { - if (initial_registration) { - initial_transferred = transferred; - return [] {}; - } - else if (initial_transferred == transferred) + double estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate; + + if (!is_streaming) { + // If the sync client has not yet processed all of the local + // transactions then the uploadable data is incorrect and we should + // not invoke the callback + if (!is_download && snapshot_version > current_progress.snapshot_version) return [] {}; - } - started_notifying = true; + // If this is a non-streaming download progress update and this notifier was + // created for a later query version (e.g. we're currently downloading + // subscription set version zero, but subscription set version 1 existed + // when the notifier was registered), then we want to skip this callback. + if (is_download && current_progress.query_version < pending_query_version) { + return [] {}; + } - // only capture and adjust transferable bytes for upload non-streaming to provide - // the progress of upload for the callback registered right after the commit - if (!is_streaming && !is_download) { + // The initial download size we get from the server is the uncompacted + // size, and so the download may complete before we actually receive + // that much data. When that happens, transferrable will drop and we + // need to use the new value instead of the captured one. if (!captured_transferable || *captured_transferable > transferable) captured_transferable = transferable; transferable = *captured_transferable; - } - // A notifier is expired for upload if at least as many bytes have been transferred - // as were originally considered transferable based on local committed version - // on callback registration, or when simply 1.0 progress is reached for download - // since the amount of bytes is not precisely known until the end - if (!is_streaming) - is_expired = is_download ? progress_estimate >= 1 : transferred >= transferable; + // Since we can adjust the transferrable downwards the estimate for uploads + // won't be correct since the sync client's view of the estimate is based on + // the total number of uploadable bytes available rather than the number of + // bytes this NotifierPackage was waiting to upload. + if (!is_download) { + estimate = transferable > 0 ? std::min(transfered / double(transferable), 1.0) : 0.0; + } + } + // A notifier is expired if at least as many bytes have been transferred + // as were originally considered transferrable. + is_expired = + !is_streaming && (transfered >= transferable && (!is_download || !pending_query_version || estimate >= 1.0)); return [=, notifier = notifier] { - notifier(transferred, transferable, progress_estimate); + notifier(transfered, transferable, estimate); }; } diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index a4cd13f09d8..0a148f3bdd1 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -54,12 +54,13 @@ class SyncProgressNotifier { using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes, double progress_estimate); - uint64_t register_callback(std::function, NotifierType direction, bool is_streaming); + uint64_t register_callback(std::function, NotifierType direction, bool is_streaming, + int64_t pending_query_version); void unregister_callback(uint64_t); void set_local_version(uint64_t); void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, - uint64_t snapshot_version, double download_estimate = 1.0, double upload_estimate = 1.0); + uint64_t snapshot_version, double download_estimate, double upload_estimate, int64_t query_version); private: mutable std::mutex m_mutex; @@ -73,6 +74,7 @@ class SyncProgressNotifier { double upload_estimate; double download_estimate; uint64_t snapshot_version; + int64_t query_version; }; // A PODS encapsulating some information for progress notifier callbacks a binding @@ -82,11 +84,9 @@ class SyncProgressNotifier { uint64_t snapshot_version; bool is_streaming; bool is_download; - bool started_notifying = false; - uint64_t initial_transferred = 0; + int64_t pending_query_version = 0; std::optional captured_transferable; - util::UniqueFunction create_invocation(const Progress&, bool& is_expired, - bool initial_registration = false); + util::UniqueFunction create_invocation(const Progress&, bool& is_expired); }; // A counter used as a token to identify progress notifier callbacks registered on this session. @@ -174,7 +174,7 @@ class SyncSession : public std::enable_shared_from_this { // Note that bindings should dispatch the callback onto a separate thread or queue // in order to avoid blocking the sync client. uint64_t register_progress_notifier(std::function&&, ProgressDirection, - bool is_streaming); + bool is_streaming) REQUIRES(!m_state_mutex); // Unregister a previously registered notifier. If the token is invalid, // this method does nothing. @@ -422,7 +422,7 @@ class SyncSession : public std::enable_shared_from_this { void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex); enum class ShouldBackup { yes, no }; void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex); - void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double); + void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double, int64_t); void handle_new_flx_sync_query(int64_t version); void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex); diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index b82fe2dcdb7..7fbd97a2875 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -980,11 +980,11 @@ void SessionImpl::process_pending_flx_bootstrap() std::chrono::duration_cast(duration).count(), pending_batch.remaining_changesets); } - on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0); REALM_ASSERT_3(query_version, !=, -1); on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch); + on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0); auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version, DownloadBatchState::LastInBatch, changesets_processed); // NoAction/EarlyReturn are both valid no-op actions to take here. @@ -1958,14 +1958,15 @@ void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadab ss << std::fixed << std::setprecision(4) << d; return ss.str(); }; - m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, " - "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7", - p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable, - to_str(upload_estimate), p.snapshot); + m_sess->logger.debug( + "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, " + "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8", + p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable, + to_str(upload_estimate), p.snapshot, m_flx_active_version); } m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate, - upload_estimate); + upload_estimate, m_flx_last_seen_version); } util::Future SessionWrapper::send_test_command(std::string body) diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index b7be53e0bdb..bff50c91d72 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -162,7 +162,7 @@ class Session { using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, std::uint_fast64_t snapshot_version, double download_estimate, - double upload_estimate); + double upload_estimate, int64_t query_version); using WaitOperCompletionHandler = util::UniqueFunction; using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data, size_t pem_size, int preverify_ok, int depth); diff --git a/test/object-store/sync/session/progress_notifications.cpp b/test/object-store/sync/session/progress_notifications.cpp index 05e1eb44d76..dbda808501e 100644 --- a/test/object-store/sync/session/progress_notifications.cpp +++ b/test/object-store/sync/session/progress_notifications.cpp @@ -36,342 +36,336 @@ using namespace realm::app; #include using namespace Catch::Matchers; -using namespace realm; +#include +using namespace realm; using NotifierType = SyncSession::ProgressDirection; -struct TestSyncProgressNotifier : _impl::SyncProgressNotifier { - void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, uint64_t snapshot) +struct ProgressEntry { + uint64_t transferred = 0; + uint64_t transferrable = 0; + double estimate = 0.0; + + inline bool operator==(const ProgressEntry& other) const noexcept + { + return transferred == other.transferred && transferrable == other.transferrable && estimate == other.estimate; + } +}; + +static std::string estimate_to_string(double est) +{ + std::ostringstream ss; + ss << std::setprecision(4) << est; + return ss.str(); +} + +static std::ostream& operator<<(std::ostream& os, const ProgressEntry& value) +{ + return os << util::format("{ transferred: %1, transferrable: %2, estimate: %3 }", value.transferred, + value.transferrable, estimate_to_string(value.estimate)); +} + + +struct WaitableProgress : public util::AtomicRefCountBase { + WaitableProgress(const std::shared_ptr& base_logger, std::string context) + : logger(std::move(context), base_logger) + { + } + + std::function make_cb() { - last_downloaded = downloaded; - last_downloadable = downloadable; - last_uploaded = uploaded; - last_uploadable = uploadable; - using Base = _impl::SyncProgressNotifier; - double download_estimate = downloadable > 0 ? double(downloaded) / downloadable : 1.0; - double upload_estimate = uploadable > 0 ? double(uploaded) / uploadable : 1.0; - Base::update(downloaded, downloadable, uploaded, uploadable, snapshot, download_estimate, upload_estimate); + auto self = util::bind_ptr(this); + return [self](uint64_t transferred, uint64_t transferrable, double estimate) { + self->logger.debug("Progress callback called xferred: %1, xferrable: %2, estimate: %3", transferred, + transferrable, estimate_to_string(estimate)); + std::lock_guard lk(self->mutex); + self->entries.push_back(ProgressEntry{transferred, transferrable, estimate}); + self->cv.notify_one(); + }; } - void update_download(uint64_t tranferred, uint64_t transferable, uint64_t snapshot) + bool empty() { - update(tranferred, transferable, last_uploaded, last_uploadable, snapshot); + std::lock_guard lk(mutex); + return entries.empty(); } - void update_upload(uint64_t tranferred, uint64_t transferable, uint64_t snapshot) + std::vector wait_for_full_sync() { - update(last_downloaded, last_downloadable, tranferred, transferable, snapshot); + std::unique_lock lk(mutex); + if (!cv.wait_for(lk, std::chrono::seconds(30), [&] { + return !entries.empty() && entries.back().transferred >= entries.back().transferrable && + entries.back().estimate >= 1.0; + })) { + CAPTURE(entries); + FAIL("Failed while waiting for progress to complete"); + return {}; + } + + std::vector ret; + std::swap(ret, entries); + return ret; } - uint64_t last_downloaded = 0, last_downloadable = 0; - uint64_t last_uploaded = 0, last_uploadable = 0; + util::PrefixLogger logger; + std::mutex mutex; + std::condition_variable cv; + std::vector entries; }; -TEST_CASE("progress notification", "[sync][session][progress]") { - TestSyncProgressNotifier progress; +struct TestInputValue { + struct IsRegistration {}; + explicit TestInputValue(IsRegistration) + : is_registration(true) + { + } + + TestInputValue(int64_t query_version, double cur_estimate, uint64_t transferred, uint64_t transferrable) + : query_version(query_version) + , cur_estimate(cur_estimate) + , transferred(transferred) + , transferrable(transferrable) + { + } + + int64_t query_version = 0; + double cur_estimate = 0; uint64_t transferred = 0; uint64_t transferrable = 0; - double progress_estimate = 0; - bool callback_was_called = false; - - auto default_callback = [&](uint64_t xferred, uint64_t xferable, double p) { - transferred = xferred; - transferrable = xferable; - progress_estimate = p; - callback_was_called = true; - }; + bool is_registration = false; +}; - auto register_default_callback = [&](NotifierType type, bool is_streaming = false) { - return progress.register_callback(default_callback, type, is_streaming); - }; - auto register_default_upload_callback = [&](bool is_streaming = false) { - return register_default_callback(NotifierType::upload, is_streaming); - }; - auto register_default_download_callback = [&](bool is_streaming = false) { - return register_default_callback(NotifierType::download, is_streaming); - }; +struct TestValues { + std::vector input_values; + std::vector expected_values; + int64_t registered_at_query_version; +}; + +TEST_CASE("progress notification", "[sync][session][progress]") { + using NotifierType = SyncSession::ProgressDirection; + _impl::SyncProgressNotifier progress; SECTION("callback is not called prior to first update") { - register_default_upload_callback(); - register_default_download_callback(); + bool callback_was_called = false; + progress.register_callback( + [&](auto, auto, double) { + callback_was_called = true; + }, + NotifierType::upload, false, 0); + progress.register_callback( + [&](auto, auto, double) { + callback_was_called = true; + }, + NotifierType::download, false, 0); REQUIRE_FALSE(callback_was_called); } - SECTION("register callback after a progress update has already occurred") { + SECTION("callback is invoked immediately when a progress update has already occurred") { progress.set_local_version(1); - progress.update(0, 0, 0, 0, 1); - REQUIRE_FALSE(callback_was_called); + progress.update(0, 0, 0, 0, 1, 0.0, 0.0, 0); + bool callback_was_called = false; SECTION("for upload notifications, with no data transfer ongoing") { - register_default_upload_callback(); - REQUIRE(callback_was_called); - } - - SECTION("for upload notifications, with data transfer ongoing") { - progress.update_upload(1, 2, 1); - REQUIRE_FALSE(callback_was_called); - register_default_upload_callback(); + double estimate = 0.0; + progress.register_callback( + [&](auto, auto, double ep) { + callback_was_called = true; + estimate = ep; + }, + NotifierType::upload, false, 0); REQUIRE(callback_was_called); + REQUIRE(estimate == 0.0); } SECTION("for download notifications, with no data transfer ongoing") { - register_default_download_callback(); - REQUIRE_FALSE(callback_was_called); - } - - SECTION("for download notifications, with data transfer ongoing") { - progress.update_download(1, 2, 1); - REQUIRE_FALSE(callback_was_called); - register_default_download_callback(); + double estimate = 0.0; + progress.register_callback( + [&](auto, auto, double ep) { + callback_was_called = true; + estimate = ep; + }, + NotifierType::download, false, 0); + REQUIRE(estimate == 0.0); REQUIRE(callback_was_called); - REQUIRE(transferred == 1); - REQUIRE(transferrable == 2); - } - - SECTION("for download notifications, when new data transfer starts") { - register_default_download_callback(); - REQUIRE_FALSE(callback_was_called); - - // upload progress shouldn't have any effect on this - progress.update_upload(1, 1, 1); - CHECK_FALSE(callback_was_called); - callback_was_called = false; - - progress.update_download(1, 2, 1); - CHECK(callback_was_called); - - callback_was_called = false; - progress.update_download(2, 2, 1); - CHECK(callback_was_called); - - callback_was_called = false; - progress.update_upload(2, 2, 1); - CHECK_FALSE(callback_was_called); } SECTION("can register another notifier while in the initial notification without deadlock") { int counter = 0; progress.register_callback( - [&](auto, auto, auto) { + [&](auto, auto, double) { counter++; progress.register_callback( - [&](auto, auto, auto) { + [&](auto, auto, double) { counter++; }, - NotifierType::upload, false); + NotifierType::upload, false, 0); }, - NotifierType::upload, false); + NotifierType::download, false, 0); REQUIRE(counter == 2); - - progress.register_callback( - [&](auto, auto, auto) { - counter++; - progress.register_callback( - [&](auto, auto, auto) { - counter++; - }, - NotifierType::download, false); - }, - NotifierType::upload, false); - REQUIRE(counter == 3); - - progress.update(1, 1, 1, 1, 1); - REQUIRE(counter == 4); - - progress.register_callback( - [&](auto, auto, auto) { - counter++; - progress.register_callback( - [&](auto, auto, auto) { - counter++; - }, - NotifierType::upload, false); - }, - NotifierType::download, false); - REQUIRE(counter == 4); - - progress.update(2, 2, 1, 1, 1); - REQUIRE(counter == 6); } } - SECTION("callback is invoked after each update") { - bool is_streaming = GENERATE(false, true); - progress.update(0, 0, 0, 0, 1); + SECTION("callback is invoked after each update for streaming notifiers") { + progress.update(0, 0, 0, 0, 1, 0.0, 0.0, 0); + bool callback_was_called = false; + uint64_t transferred = 0; + uint64_t transferrable = 0; uint64_t current_transferred = 0; uint64_t current_transferrable = 0; + double estimate = 0.0; SECTION("for upload notifications") { - register_default_upload_callback(is_streaming); + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + callback_was_called = true; + estimate = ep; + }, + NotifierType::upload, true, 0); REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. callback_was_called = false; current_transferred = 60; current_transferrable = 912; - progress.update_upload(current_transferred, current_transferrable, 1); - - // non-streaming callback returns immediatelly on registration if there is no data to upload - const uint64_t final_transferred = current_transferrable; - if (!is_streaming) - register_default_upload_callback(is_streaming); - + double current_estimate = current_transferred / double(current_transferrable); + progress.update(25, 26, current_transferred, current_transferrable, 1, 25 / double(26), current_estimate, + 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); + CHECK(estimate == current_estimate); // Second callback callback_was_called = false; current_transferred = 79; current_transferrable = 1021; - progress.update_upload(current_transferred, current_transferrable, 1); + current_estimate = current_transferred / double(current_transferrable); + progress.update(68, 191, current_transferred, current_transferrable, 1, 68 / double(191), + current_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); - CHECK(transferrable == (is_streaming ? current_transferrable : final_transferred)); + CHECK(transferrable == current_transferrable); + CHECK(estimate == current_estimate); // Third callback callback_was_called = false; current_transferred = 150; current_transferrable = 1228; - progress.update_upload(current_transferred, current_transferrable, 1); + current_estimate = current_transferred / double(current_transferrable); + progress.update(199, 591, current_transferred, current_transferrable, 1, 199 / double(591), + current_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); - CHECK(transferrable == (is_streaming ? current_transferrable : final_transferred)); - - // final callback for non-streaming callback based on bytes on registration - callback_was_called = false; - current_transferred = final_transferred; - progress.update_upload(current_transferred, current_transferrable, 1); - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == (is_streaming ? current_transferrable : final_transferred)); - - callback_was_called = false; - current_transferred = final_transferred + 100; - progress.update_upload(current_transferred, current_transferrable, 1); - if (!is_streaming) { - CHECK_FALSE(callback_was_called); - } - else { - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == (is_streaming ? current_transferrable : final_transferred)); - } + CHECK(transferrable == current_transferrable); + CHECK(estimate == current_estimate); } SECTION("for download notifications") { - register_default_download_callback(is_streaming); - REQUIRE(!callback_was_called); + progress.register_callback( + [&](auto xferred, auto xferable, double pe) { + transferred = xferred; + transferrable = xferable; + estimate = pe; + callback_was_called = true; + }, + NotifierType::download, true, 0); + REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. callback_was_called = false; current_transferred = 60; current_transferrable = 912; - progress.update_download(current_transferred, current_transferrable, 1); + progress.update(current_transferred, current_transferrable, 25, 26, 1, + current_transferred / double(current_transferrable), 1.0, 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); + CHECK(estimate == current_transferred / double(current_transferrable)); // Second callback callback_was_called = false; current_transferred = 79; current_transferrable = 1021; - progress.update_download(current_transferred, current_transferrable, 1); + progress.update(current_transferred, current_transferrable, 68, 191, 1, + current_transferred / double(current_transferrable), 1.0, 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); + CHECK(estimate == current_transferred / double(current_transferrable)); // Third callback callback_was_called = false; current_transferred = 150; current_transferrable = 1228; - progress.update_download(current_transferred, current_transferrable, 1); - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == current_transferrable); - - // final callback for non-streaming callback based on bytes on registration - callback_was_called = false; - current_transferred = 1544; - current_transferrable = 1544; - progress.update_download(current_transferred, current_transferrable, 1); + progress.update(current_transferred, current_transferrable, 199, 591, 1, + current_transferred / double(current_transferrable), 1.0, 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); - - // only streaming callback type should keep reporting values after that - callback_was_called = false; - current_transferred = 2324; - current_transferrable = 4544; - progress.update_download(current_transferred, current_transferrable, 1); - if (!is_streaming) { - CHECK_FALSE(callback_was_called); - } - else { - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == current_transferrable); - } } SECTION("token unregistration works") { - uint64_t token = register_default_download_callback(is_streaming); - REQUIRE_FALSE(callback_was_called); + uint64_t token = progress.register_callback( + [&](auto xferred, auto xferable, double) { + transferred = xferred; + transferrable = xferable; + callback_was_called = true; + }, + NotifierType::download, true, 0); + REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. + callback_was_called = false; current_transferred = 60; current_transferrable = 912; - progress.update(current_transferred, current_transferrable, 25, 26, 1); + double current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 25, 26, 1, current_estimate, 25 / double(26), + 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); - // Unregister download + // Unregister progress.unregister_callback(token); // Second callback: should not actually do anything. callback_was_called = false; - progress.update(150, 1228, 199, 591, 1); - CHECK_FALSE(callback_was_called); - - token = register_default_upload_callback(is_streaming); - progress.update(250, 1228, 350, 591, 1); - CHECK(callback_was_called); - - // Unregister upload - progress.unregister_callback(token); - callback_was_called = false; - progress.update(750, 1228, 450, 591, 1); - CHECK_FALSE(callback_was_called); + current_transferred = 150; + current_transferrable = 1228; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 199, 591, 1, current_estimate, + 199 / double(591), 0); + CHECK(!callback_was_called); } SECTION("for multiple notifiers") { - register_default_download_callback(is_streaming); - REQUIRE_FALSE(callback_was_called); - - progress.update(20, 100, 10, 150, 1); + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + estimate = ep; + callback_was_called = true; + }, + NotifierType::download, true, 0); REQUIRE(callback_was_called); - CHECK(transferred == 20); - CHECK(transferrable == 100); // Register a second notifier. bool callback_was_called_2 = false; uint64_t transferred_2 = 0; uint64_t transferrable_2 = 0; - double progress_estimate_2 = 0; + double upload_estimate = 0.0; progress.register_callback( - [&](auto xferred, auto xferable, auto p) { + [&](auto xferred, auto xferable, double ep) { transferred_2 = xferred; transferrable_2 = xferable; - progress_estimate_2 = p; callback_was_called_2 = true; + upload_estimate = ep; }, - NotifierType::upload, is_streaming); + NotifierType::upload, true, 0); REQUIRE(callback_was_called_2); - CHECK(transferred_2 == 10); - CHECK(transferrable_2 == 150); - const uint64_t final_uploaded = transferrable_2; // Now manually call the notifier handler a few times. callback_was_called = false; @@ -380,13 +374,18 @@ TEST_CASE("progress notification", "[sync][session][progress]") { uint64_t current_uploadable = 201; uint64_t current_downloaded = 68; uint64_t current_downloadable = 182; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + auto current_down_estimate = current_downloaded / double(current_downloadable); + auto current_up_estimate = current_uploaded / double(current_uploadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_down_estimate, current_up_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_downloaded); CHECK(transferrable == current_downloadable); + CHECK(estimate == current_down_estimate); CHECK(callback_was_called_2); CHECK(transferred_2 == current_uploaded); - CHECK(transferrable_2 == (is_streaming ? current_uploadable : final_uploaded)); + CHECK(transferrable_2 == current_uploadable); + CHECK(upload_estimate == current_up_estimate); // Second callback callback_was_called = false; @@ -395,65 +394,209 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 329; current_downloaded = 76; current_downloadable = 191; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + current_down_estimate = current_downloaded / double(current_downloadable); + current_up_estimate = current_uploaded / double(current_uploadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_down_estimate, current_up_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_downloaded); CHECK(transferrable == current_downloadable); + CHECK(estimate == current_down_estimate); CHECK(callback_was_called_2); CHECK(transferred_2 == current_uploaded); - CHECK(transferrable_2 == (is_streaming ? current_uploadable : final_uploaded)); + CHECK(transferrable_2 == current_uploadable); + CHECK(current_up_estimate == upload_estimate); } } - SECTION("upload notifications are not sent until all local changesets have been processed") { - progress.set_local_version(4); + SECTION("properly runs for non-streaming notifiers") { + bool callback_was_called = false; + uint64_t transferred = 0; + uint64_t transferrable = 0; + uint64_t current_transferred = 0; + uint64_t current_transferrable = 0; + double upload_estimate = 0; + double download_estimate = 0; - register_default_upload_callback(); - REQUIRE_FALSE(callback_was_called); + SECTION("for upload notifications") { + // Prime the progress updater + current_transferred = 60; + current_transferrable = 501; + const uint64_t original_transferrable = current_transferrable; + double current_estimate = current_transferred / double(current_transferrable); + progress.update(21, 26, current_transferred, current_transferrable, 1, 21 / double(26), current_estimate, + 0); - uint64_t current_transferred = 66; - uint64_t current_transferrable = 582; - progress.update_upload(current_transferred, current_transferrable, 3); - REQUIRE_FALSE(callback_was_called); + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + upload_estimate = ep; + callback_was_called = true; + }, + NotifierType::upload, false, 0); + REQUIRE(callback_was_called); - current_transferred = 77; - current_transferrable = 1021; - progress.update(0, 0, current_transferred, current_transferrable, 4); - REQUIRE(callback_was_called); - CHECK(transferred == current_transferred); - // should not have captured transferrable from the first update - CHECK(transferrable == current_transferrable); - } + // Now manually call the notifier handler a few times. + callback_was_called = false; + current_transferred = 66; + current_transferrable = 582; + current_estimate = current_transferred / double(current_transferrable); + progress.update(21, 26, current_transferred, current_transferrable, 1, 21 / double(26), current_estimate, + 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == original_transferrable); + CHECK(upload_estimate == current_transferred / double(original_transferrable)); - SECTION("download notifications are not sent until a DOWNLOAD message has been received") { - register_default_download_callback(); + // Second callback + callback_was_called = false; + current_transferred = original_transferrable + 100; + current_transferrable = 1021; + current_estimate = current_transferred / double(current_transferrable); + progress.update(68, 191, current_transferred, current_transferrable, 1, 68 / double(191), + current_estimate, 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == original_transferrable); + CHECK(upload_estimate == 1.0); - // Next we get a DOWNLOAD message telling us there's more to download - progress.update_download(100, 100, 1); - REQUIRE(callback_was_called); - REQUIRE(transferrable == 100); - REQUIRE(transferred == 100); + // The notifier should be unregistered at this point, and not fire. + callback_was_called = false; + current_transferred = original_transferrable + 250; + current_transferrable = 1228; + current_estimate = current_transferred / double(current_transferrable); + progress.update(199, 591, current_transferred, current_transferrable, 1, 199 / double(591), + current_estimate, 0); + CHECK(!callback_was_called); + } - // download was finished, next DOWNLOAD shouldn't notify - callback_was_called = false; - progress.update_download(200, 200, 1); - REQUIRE_FALSE(callback_was_called); + SECTION("upload notifications are not sent until all local changesets have been processed") { + progress.set_local_version(4); - // After the download has completed, new notifications only trigger after the update - transferred = 0; - transferrable = 0; - callback_was_called = false; + progress.register_callback( + [&](auto xferred, auto xferable, double) { + transferred = xferred; + transferrable = xferable; + callback_was_called = true; + }, + NotifierType::upload, false, 0); + REQUIRE_FALSE(callback_was_called); - register_default_download_callback(); - REQUIRE_FALSE(callback_was_called); + current_transferred = 66; + current_transferrable = 582; + double current_estimate = current_transferred / double(current_transferrable); + progress.update(0, 0, current_transferred, current_transferrable, 3, 1.0, current_estimate, 0); + REQUIRE_FALSE(callback_was_called); - progress.update_download(250, 300, 1); - REQUIRE(callback_was_called); - REQUIRE(transferred == 250); - REQUIRE(transferrable == 300); - } + current_transferred = 77; + current_transferrable = 1021; + current_estimate = current_transferred / double(current_transferrable); + progress.update(0, 0, current_transferred, current_transferrable, 4, 1.0, current_estimate, 0); + REQUIRE(callback_was_called); + CHECK(transferred == current_transferred); + // should not have captured transferrable from the first update + CHECK(transferrable == current_transferrable); + CHECK(current_estimate == current_estimate); + } + + SECTION("for download notifications") { + // Prime the progress updater + current_transferred = 60; + current_transferrable = 501; + double current_estimate = current_transferred / double(current_transferrable); + const uint64_t original_transferrable = current_transferrable; + progress.update(current_transferred, current_transferrable, 21, 26, 1, current_estimate, 21 / double(26), + 0); + + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + download_estimate = ep; + callback_was_called = true; + }, + NotifierType::download, false, 0); + REQUIRE(callback_was_called); + + // Now manually call the notifier handler a few times. + callback_was_called = false; + current_transferred = 66; + current_transferrable = 582; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 25, 26, 1, current_estimate, 25 / double(26), + 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == original_transferrable); + CHECK(download_estimate == current_estimate); + + // Second callback + callback_was_called = false; + current_transferred = original_transferrable + 100; + current_transferrable = 1021; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 68, 191, 1, current_estimate, + 68 / double(191), 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == original_transferrable); + CHECK(download_estimate == current_estimate); + + // The notifier should be unregistered at this point, and not fire. + callback_was_called = false; + current_transferred = original_transferrable + 250; + current_transferrable = 1228; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 199, 591, 1, current_estimate, + 199 / double(591), 0); + CHECK(!callback_was_called); + } + + SECTION("token unregistration works") { + // Prime the progress updater + current_transferred = 60; + current_transferrable = 501; + double current_estimate = current_transferred / double(current_transferrable); + const uint64_t original_transferrable = current_transferrable; + progress.update(21, 26, current_transferred, current_transferrable, 1, 21 / double(26), current_estimate, + 0); + + uint64_t token = progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + upload_estimate = ep; + callback_was_called = true; + }, + NotifierType::upload, false, 0); + REQUIRE(callback_was_called); + + // Now manually call the notifier handler a few times. + callback_was_called = false; + current_transferred = 66; + current_transferrable = 912; + current_estimate = current_transferred / double(current_transferrable); + progress.update(25, 26, current_transferred, current_transferrable, 1, 25 / double(26), current_estimate, + 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == original_transferrable); + CHECK(upload_estimate == std::min(1.0, current_transferred / double(original_transferrable))); + + // Unregister + progress.unregister_callback(token); + + // Second callback: should not actually do anything. + callback_was_called = false; + current_transferred = 67; + current_transferrable = 1228; + current_estimate = current_transferred / double(current_transferrable); + progress.update(199, 591, current_transferred, current_transferrable, 1, 199 / double(591), + current_estimate, 0); + CHECK(!callback_was_called); + } - SECTION("properly runs for non-streaming notifiers") { SECTION("for multiple notifiers, different directions") { // Prime the progress updater uint64_t current_uploaded = 16; @@ -461,24 +604,34 @@ TEST_CASE("progress notification", "[sync][session][progress]") { uint64_t current_downloaded = 68; uint64_t current_downloadable = 182; const uint64_t original_uploadable = current_uploadable; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + const uint64_t original_downloadable = current_downloadable; + double current_upload_estimate = current_uploaded / double(current_uploadable); + double current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); - register_default_upload_callback(); + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + upload_estimate = ep; + callback_was_called = true; + }, + NotifierType::upload, false, 0); REQUIRE(callback_was_called); // Register a second notifier. bool callback_was_called_2 = false; uint64_t downloaded = 0; uint64_t downloadable = 0; - double download_progress = 0; progress.register_callback( - [&](auto xferred, auto xferable, auto p) { + [&](auto xferred, auto xferable, double ep) { downloaded = xferred; downloadable = xferable; - download_progress = p; + download_estimate = ep; callback_was_called_2 = true; }, - NotifierType::download, false); + NotifierType::download, false, 0); REQUIRE(callback_was_called_2); // Now manually call the notifier handler a few times. @@ -488,13 +641,18 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 171; current_downloadable = 185; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_uploaded); CHECK(transferrable == original_uploadable); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); - CHECK(downloadable == current_downloadable); + CHECK(downloadable == original_downloadable); + CHECK(upload_estimate == std::min(1.0, current_uploaded / double(original_uploadable))); + CHECK(download_estimate == current_download_estimate); // Second callback, last one for the upload notifier callback_was_called = false; @@ -503,149 +661,371 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 174; current_downloadable = 190; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_uploaded); CHECK(transferrable == original_uploadable); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); - CHECK(downloadable == current_downloadable); + CHECK(downloadable == original_downloadable); + CHECK(upload_estimate == std::min(1.0, current_uploaded / double(original_uploadable))); + CHECK(download_estimate == current_download_estimate); // Third callback, last one for the download notifier callback_was_called = false; callback_was_called_2 = false; current_uploaded = 218; current_uploadable = 310; - current_downloaded = 196; + current_downloaded = 182; current_downloadable = 196; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(!callback_was_called); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); - CHECK(downloadable == current_downloadable); + CHECK(downloadable == original_downloadable); + CHECK(upload_estimate == std::min(1.0, current_uploaded / double(original_uploadable))); + CHECK(download_estimate == current_download_estimate); // Fourth callback, last one for the download notifier callback_was_called_2 = false; current_uploaded = 220; current_uploadable = 410; - current_downloaded = 196; + current_downloaded = 192; current_downloadable = 591; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(!callback_was_called); CHECK(!callback_was_called_2); } SECTION("for multiple notifiers, same direction") { // Prime the progress updater + uint64_t current_uploaded = 16; + uint64_t current_uploadable = 201; uint64_t current_downloaded = 68; uint64_t current_downloadable = 182; - progress.update_download(current_downloaded, current_downloadable, 1); + double current_upload_estimate = current_uploaded / double(current_uploadable); + double current_download_estimate = current_downloaded / double(current_downloadable); - register_default_download_callback(); + const uint64_t original_downloadable = current_downloadable; + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); + + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + download_estimate = ep; + callback_was_called = true; + }, + NotifierType::download, false, 0); REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. callback_was_called = false; + current_uploaded = 36; + current_uploadable = 310; current_downloaded = 171; current_downloadable = 185; - progress.update_download(current_downloaded, current_downloadable, 1); + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_downloaded); - CHECK(transferrable == current_downloadable); + CHECK(transferrable == original_downloadable); // Register a second notifier. bool callback_was_called_2 = false; uint64_t downloaded = 0; uint64_t downloadable = 0; - double download_progress = 0; + const uint64_t original_downloadable_2 = current_downloadable; progress.register_callback( - [&](auto xferred, auto xferable, auto p) { + [&](auto xferred, auto xferable, double ep) { downloaded = xferred; downloadable = xferable; - download_progress = p; + download_estimate = ep; callback_was_called_2 = true; }, - NotifierType::download, false); - // Wait for the initial callback, since the download is ongoing + NotifierType::download, false, 0); REQUIRE(callback_was_called_2); - CHECK(downloaded == current_downloaded); - CHECK(downloadable == current_downloadable); - // next callback, last one for the whole current download + // Second callback, last one for first notifier callback_was_called = false; callback_was_called_2 = false; - current_downloaded = 190; + current_uploaded = 36; + current_uploadable = 310; + current_downloaded = 182; current_downloadable = 190; - progress.update_download(current_downloaded, current_downloadable, 1); + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(callback_was_called); CHECK(transferred == current_downloaded); - CHECK(transferrable == current_downloadable); + CHECK(transferrable == original_downloadable); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); - CHECK(downloadable == current_downloadable); + CHECK(downloadable == original_downloadable_2); + CHECK(download_estimate == current_download_estimate); - // after that all callbacks should be deregistered automatically and not trigger + // Third callback, last one for second notifier callback_was_called = false; callback_was_called_2 = false; - progress.update_download(201, 289, 1); + current_uploaded = 36; + current_uploadable = 310; + current_downloaded = 189; + current_downloadable = 250; + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(!callback_was_called); + CHECK(callback_was_called_2); + CHECK(downloaded == current_downloaded); + CHECK(downloadable == original_downloadable_2); + CHECK(download_estimate == current_download_estimate); + + // Fourth callback + callback_was_called_2 = false; + current_uploaded = 36; + current_uploadable = 310; + current_downloaded = 201; + current_downloadable = 289; + current_upload_estimate = current_uploaded / double(current_uploadable); + current_download_estimate = current_downloaded / double(current_downloadable); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, + current_download_estimate, current_upload_estimate, 0); CHECK(!callback_was_called_2); } - } - SECTION("download notifiers handle transferrable changing") { - bool is_streaming = GENERATE(false, true); - - // Prime the progress updater - uint64_t current_transferred = 60; - uint64_t current_transferrable = 501; - const uint64_t original_transferrable = current_transferrable; - progress.update_download(current_transferred, current_transferrable, 1); - - register_default_download_callback(is_streaming); - REQUIRE(callback_was_called); - - // Download some data but also drop the total. transferrable should - // update because it decreased. - callback_was_called = false; - current_transferred = 160; - current_transferrable = 451; - progress.update_download(current_transferred, current_transferrable, 1); - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == current_transferrable); - - // Increasing current_transferrable will increase transferrable - callback_was_called = false; - current_transferrable = 1000; - progress.update_download(current_transferred, current_transferrable, 1); - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == current_transferrable); - - // Transferrable dropping to be equal to transferred should notify - // and then expire the notifier - callback_was_called = false; - current_transferred = 200; - current_transferrable = current_transferred; - progress.update_download(current_transferred, current_transferrable, 1); - CHECK(callback_was_called); - CHECK(transferred == current_transferred); - CHECK(transferrable == current_transferred); - - // The non-streaming notifier should be unregistered at this point, and not fire. - callback_was_called = false; - current_transferred = original_transferrable + 250; - current_transferrable = 1228; - progress.update_download(current_transferred, current_transferrable, 1); - if (!is_streaming) { - CHECK(!callback_was_called); - } - else { + SECTION("download notifiers handle transferrable decreasing") { + // Prime the progress updater + current_transferred = 60; + current_transferrable = 501; + const uint64_t original_transferrable = current_transferrable; + double current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 21, 26, 1, current_estimate, 21 / double(26), + 0); + + progress.register_callback( + [&](auto xferred, auto xferable, double ep) { + transferred = xferred; + transferrable = xferable; + callback_was_called = true; + download_estimate = ep; + }, + NotifierType::download, false, 0); + REQUIRE(callback_was_called); + + // Download some data but also drop the total. transferrable should + // update because it decreased. + callback_was_called = false; + current_transferred = 160; + current_transferrable = 451; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 25, 26, 1, current_estimate, 26 / double(26), + 0); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); + CHECK(current_estimate == download_estimate); + + // Increasing current_transferrable should not increase transferrable + const uint64_t previous_transferrable = current_transferrable; + callback_was_called = false; + current_transferrable = 1000; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 68, 191, 1, current_estimate, + 68 / double(191), 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == previous_transferrable); + CHECK(download_estimate == current_estimate); + + // Transferrable dropping to be equal to transferred should notify + // and then expire the notifier + callback_was_called = false; + current_transferred = 200; + current_transferrable = current_transferred; + current_estimate = current_transferred / double(current_transferrable); + progress.update(current_transferred, current_transferrable, 191, 192, 1, current_estimate, + 191 / double(192), 0); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == current_transferred); + CHECK(current_estimate == download_estimate); + + // The notifier should be unregistered at this point, and not fire. + callback_was_called = false; + current_transferred = original_transferrable + 250; + current_transferrable = 1228; + current_estimate = current_transferred / double(current_transferrable); + + progress.update(current_transferred, current_transferrable, 199, 591, 1, current_estimate, + 199 / double(591), 0); + CHECK(!callback_was_called); + } + } + + SECTION("flx streaming notifiers") { + // clang-format off + TestValues test_values = GENERATE( + // resgisters at the begining and should see all entries. + TestValues{{ + TestInputValue{TestInputValue::IsRegistration{}}, + TestInputValue{0, 0, 0, 0}, + TestInputValue{0, 1, 200, 200}, + TestInputValue{1, 0.2, 300, 600}, + TestInputValue{1, 0.4, 400, 600}, + TestInputValue{1, 0.8, 600, 700}, + TestInputValue{1, 1, 700, 700}, + TestInputValue{2, 0.3, 800, 1000}, + TestInputValue{2, 0.6, 900, 1000}, + TestInputValue{2, 1, 1000, 1000}, + }, { + ProgressEntry{0, 0, 0}, + ProgressEntry{200, 200, 1}, + ProgressEntry{300, 600, 0.2}, + ProgressEntry{400, 600, 0.4}, + ProgressEntry{600, 700, 0.8}, + ProgressEntry{700, 700, 1}, + ProgressEntry{800, 1000, 0.3}, + ProgressEntry{900, 1000, 0.6}, + ProgressEntry{1000, 1000, 1}, + }, 1}, + TestValues{{ + TestInputValue{1, 0.2, 300, 600}, + TestInputValue{1, 0.4, 400, 600}, + TestInputValue{TestInputValue::IsRegistration{}}, + TestInputValue{1, 0.8, 600, 700}, + TestInputValue{1, 1, 700, 700}, + }, { + ProgressEntry{400, 600, 0.4}, + ProgressEntry{600, 700, 0.8}, + ProgressEntry{700, 700, 1.0}, + }, 1}, + // registers for a query version that's already up-to-date - should get an immediate update + // with a progress estimate of 1 and whatever the current transferred/transferrable numbers are + TestValues{{ + TestInputValue{2, 0.5, 800, 900}, + TestInputValue{2, 1, 900, 900}, + TestInputValue{TestInputValue::IsRegistration{}}, + TestInputValue{2, 1, 1000, 1000} + }, { + ProgressEntry{900, 900, 1}, + ProgressEntry{1000, 1000, 1}, + }, 1} + ); + // clang-format on + + auto logger = util::Logger::get_default_logger(); + auto progress_output = util::make_bind(logger, "flx non-streaming download"); + + uint64_t snapshot = 1; + for (const auto& input_val : test_values.input_values) { + if (input_val.is_registration) { + progress.register_callback(progress_output->make_cb(), NotifierType::download, true, + test_values.registered_at_query_version); + continue; + } + progress.update(input_val.transferred, input_val.transferrable, 0, 0, ++snapshot, input_val.cur_estimate, + 0.0, input_val.query_version); } + + const auto output_values = progress_output->wait_for_full_sync(); + + REQUIRE_THAT(output_values, Catch::Matchers::Equals(test_values.expected_values)); + } + + SECTION("flx non-streaming notifiers") { + // clang-format off + TestValues test_values = GENERATE( + // registers for query version 1 on an empty realm - should see the full progression + // of query version 1 and nothing else. + TestValues{{ + TestInputValue{TestInputValue::IsRegistration{}}, + TestInputValue{0, 0, 0, 0}, + TestInputValue{0, 1, 200, 200}, + TestInputValue{1, 0.2, 300, 600}, + TestInputValue{1, 0.4, 400, 600}, + TestInputValue{1, 0.8, 600, 700}, + TestInputValue{1, 1, 700, 700}, + TestInputValue{2, 0.3, 800, 1000}, + TestInputValue{2, 0.6, 900, 1000}, + TestInputValue{2, 1, 1000, 1000}, + }, { + ProgressEntry{300, 600, 0.2}, + ProgressEntry{400, 600, 0.4}, + ProgressEntry{600, 600, 0.8}, + ProgressEntry{700, 600, 1.0}, + }, 1}, + // registers a notifier in the middle of syncing the target query version + TestValues{{ + TestInputValue{1, 0.2, 300, 600}, + TestInputValue{1, 0.4, 400, 600}, + TestInputValue{TestInputValue::IsRegistration{}}, + TestInputValue{1, 0.8, 600, 700}, + TestInputValue{1, 1, 700, 700}, + // There's also a progress notification for a regular steady state + // download message that gets ignored because we're already up-to-date + TestInputValue{1, 1, 800, 800}, + }, { + ProgressEntry{400, 600, 0.4}, + ProgressEntry{600, 600, 0.8}, + ProgressEntry{700, 600, 1.0}, + }, 1}, + // registers for a notifier for a later query version - should only see notifications + // for downloads greater than the requested query version + TestValues{{ + + TestInputValue{TestInputValue::IsRegistration{}}, + TestInputValue{1, 0.8, 700, 700}, + TestInputValue{1, 1, 700, 700}, + TestInputValue{3, 0.5, 800, 900}, + TestInputValue{3, 1, 900, 900}, + }, { + ProgressEntry{800, 900, 0.5}, + ProgressEntry{900, 900, 1}, + }, 2}, + // registers for a query version that's already up-to-date - should get an immediate update + // with a progress estimate of 1 and whatever the current transferred/transferrable numbers are + TestValues{{ + TestInputValue{2, 0.5, 800, 900}, + TestInputValue{2, 1, 900, 900}, + TestInputValue{TestInputValue::IsRegistration{}}, + }, { + ProgressEntry{900, 900, 1}, + }, 1} + ); + // clang-format on + + auto logger = util::Logger::get_default_logger(); + auto progress_output = util::make_bind(logger, "flx non-streaming download"); + + uint64_t snapshot = 1; + for (const auto& input_val : test_values.input_values) { + if (input_val.is_registration) { + progress.register_callback(progress_output->make_cb(), NotifierType::download, false, + test_values.registered_at_query_version); + continue; + } + progress.update(input_val.transferred, input_val.transferrable, 0, 0, ++snapshot, input_val.cur_estimate, + 0.0, input_val.query_version); + } + + const auto output_values = progress_output->wait_for_full_sync(); + + REQUIRE_THAT(output_values, Catch::Matchers::Equals(test_values.expected_values)); } } @@ -657,7 +1037,7 @@ struct TestSetup { return r->read_group().get_table("class_" + table_name); } - size_t add_objects(SharedRealm& r, int num = 5) + size_t add_objects(SharedRealm& r, int num) { CppContext ctx(r); for (int i = 0; i < num; ++i) { @@ -736,308 +1116,184 @@ struct FLX : TestSetup { FLXSyncTestHarness harness; }; -struct ProgressNotification { - uint64_t xferred, xferable; - double estimate; -}; - -struct ReportedProgress : std::vector> { - void add(size_t idx, uint64_t xferred, uint64_t xferable, double estimate) +struct ProgressIncreasesMatcher : Catch::Matchers::MatcherGenericBase { + enum MatchMode { ByteCountOnly, All }; + ProgressIncreasesMatcher() = default; + explicit ProgressIncreasesMatcher(MatchMode mode) + : m_mode(mode) { - std::lock_guard lock(mutex); - (*this)[idx].emplace_back(ProgressNotification{xferred, xferable, estimate}); } - std::string dump(size_t begin = 0, size_t end = -1) + bool match(std::vector const& entries) const { - auto&& self = *this; - std::ostringstream out; - for (size_t i = begin, e = std::min(end, self.size()); i < e; ++i) { - out << (i > begin ? "\n" : "") << i << " [" << self[i].size() << "]: "; - for (auto&& p : self[i]) - out << "(" << p.xferred << ", " << p.xferable << ", " << std::setprecision(4) << p.estimate << "), "; + if (entries.size() < 1) { + return false; + } + + auto last = std::ref(entries.front()); + for (size_t i = 1; i < entries.size(); ++i) { + ProgressEntry const& cur = entries[i]; + if (cur.transferred < last.get().transferred) { + return false; + } + if (m_mode == All && cur.estimate < last.get().estimate) { + return false; + } + last = cur; } - return out.str(); + return true; } - void clear() + std::string describe() const override { - std::lock_guard lock(mutex); - // cout << dump() << endl; // uncomment to dump full progress values on every test stage - for (auto&& values : *this) - values.clear(); + return "progress notifications all increase"; } - std::mutex mutex; +private: + MatchMode m_mode = All; }; -#define VERIFY_PROGRESS_EMPTY(progress, begin, end) \ - { \ - std::lock_guard lock(progress.mutex); \ - for (size_t i = begin; i < end; ++i) { \ - INFO(util::format("i = %1, %2", i, progress.dump(i, i + 1))); \ - auto&& values = progress[i]; \ - CHECK(values.size() == 0); \ - } \ - } - -#define VERIFY_PROGRESS_CONSISTENCY_ONE(progress, i, expected_download_stages, is_download, is_streaming) \ - { \ - INFO(i); \ - REQUIRE(expected_download_stages > 0); \ - REQUIRE(i < progress.size()); \ - auto&& values = progress[i]; \ - \ - CHECK(values.size() > 0); \ - if (values.front().estimate < 1) \ - CHECK(values.size() >= 3); \ - int progress_stages = expected_download_stages; \ - \ - for (size_t j = 0; j < values.size(); ++j) { \ - auto&& p = values[j]; \ - INFO(util::format("Fail index i: %1, j: %2 | Reported progress:\n%3", i, j, progress.dump())); \ - \ - CHECK(0 <= p.xferred); \ - CHECK(p.xferred <= p.xferable); \ - CHECK(0 <= p.estimate); \ - CHECK(p.estimate <= 1.0); \ - \ - if (j <= 0) \ - continue; \ - \ - auto&& prev = values[j - 1]; \ - CHECK(prev.xferred <= p.xferred); \ - \ - /* downloadable may fluctuate by design: \ - * pbs: downloadable from the DOWNLOAD message is added to downloaded so far \ - * always after the changeset integration, commit is always a bit smaller, \ - * hence downloadable always gets a bit smaller than previous value \ - * flx: downloadable is always as good as an estimate from the server, fluctuates both ways */ \ - if (!is_download) \ - CHECK(prev.xferable <= p.xferable); \ - \ - if (is_download && is_streaming && prev.estimate > p.estimate) { \ - CHECK(prev.estimate == 1.0); \ - CHECK(progress_stages >= 1); \ - --progress_stages; \ - } \ - else { \ - CHECK(prev.estimate <= p.estimate); \ - } \ - \ - bool is_last = j == values.size() - 1; \ - if (is_last) { \ - auto&& last = values.back(); \ - CHECK(last.estimate == 1.0); \ - CHECK(last.xferred == last.xferable); \ - } \ - } \ - } - -/* - * This test runs a few scenarios for synchronizing changes between two separate realm files for the same app, - * and verifies high-level consistency in reported progress notification's values. - * - * It doesn't try to check for particular reported values: these are checked in sync impl tests, - * and specific combinations of updates verified directly in SyncProgressNotifier tests. - * - * First, test adds a few objects into one realm, verifies that the progress is reported until upload completion. - * Then it checks how this exact changes are downloaded into the second realm file (this essentially checks - * how progress is reported with bootstrap store for flx). - * - * Next subtests, are here to check how continuous sync reports progress. It reuses the same two realm files - * with synchronized objects in them both. Test adds more objects into the second realm to sync more changes - * the other way around: from second realm to the first one, and check if also upload progress correct for - * the second realm, and download progress for the first realm after its initial upload. - * - first by reusing the same realm instance for the second realm - * - second by closing and reopening second realm file with new SharedRealm instance - * - * Separately, AsyncOpenTask is checked twice: with initial empty third realm file, and with subsequent second opening - * with more changes to download from the server. The progress reported through task interface should behave in the - * same way as with cases tested above. - */ -TEMPLATE_TEST_CASE("sync progress notifications", "[sync][baas][progress]", PBS, FLX) +TEMPLATE_TEST_CASE("progress notifications fire immediately when fully caught up", "[baas][progress][sync]", PBS, FLX) { - TestType setup; - size_t expected_count = 0; - -#define VERIFY_REALM(realm_1, realm_2, expected) \ - { \ - REQUIRE(expected > 0); \ - REQUIRE(realm_1); \ - REQUIRE(realm_2); \ - REQUIRE(realm_1 != realm_2); \ - auto table1 = setup.get_table(realm_1); \ - auto table2 = setup.get_table(realm_2); \ - REQUIRE(table1); \ - REQUIRE(table2); \ - REQUIRE(table1->size() == expected); \ - REQUIRE(table2->size() == expected); \ - } - - // register set of 4 callbacks to put values in predefined places in reported progress list: - // idx 0: non-streaming/download, 1: non-streaming/upload, 2: streaming/download, 3: streaming/upload - auto add_callbacks = [&](SharedRealm& realm, ReportedProgress& progress) { - size_t idx = progress.size(); - progress.resize(idx + 4); - for (auto&& stream : {false, true}) - for (auto&& direction : {NotifierType::download, NotifierType::upload}) - realm->sync_session()->register_progress_notifier( - [&, i = idx++](uint64_t xferred, uint64_t xferable, double estimate) { - progress.add(i, xferred, xferable, estimate); - }, - direction, stream); + TestType pbs_setup; + auto logger = util::Logger::get_default_logger(); + + auto validate_noop_entry = [&](const std::vector& entries, std::string context) { + UNSCOPED_INFO("validating noop non-streaming entry " << context); + REQUIRE(entries.size() == 1); + const auto& entry = entries.front(); + REQUIRE(entry.transferred >= entry.transferrable); + REQUIRE(entry.estimate >= 1.0); }; -#define VERIFY_PROGRESS_CONSISTENCY(progress, begin, end, sync_direction_is_download) \ - { \ - std::lock_guard lock(progress.mutex); \ - REQUIRE(begin < end); \ - REQUIRE(end <= progress.size()); \ - \ - for (size_t i = begin; i < end; ++i) { \ - /* from add_callbacks: odd sequence number: upload, even: download */ \ - bool is_download = i % 2 == 0; \ - /* first two lists are for non-streaming, next streaming callbacks */ \ - bool is_streaming = i % 4 > 1; \ - \ - /* since the test checks only one direction at a time: from one realm to the other, \ - * allow empty reported progress only for the other direction, \ - * this is the case when the session is simply restarted after initial sync */ \ - if (progress[i].empty()) { \ - CHECK(sync_direction_is_download != is_download); \ - continue; \ - } \ - \ - VERIFY_PROGRESS_CONSISTENCY_ONE(progress, i, 1, is_download, is_streaming); \ - } \ - } - - auto wait_for_sync = [](SharedRealm& realm) { - realm->sync_session()->resume(); - wait_for_upload(*realm); - wait_for_download(*realm); - realm->sync_session()->pause(); - realm->refresh(); - }; + SECTION("empty async open results in progress notification") { + auto config = pbs_setup.make_config(); + auto async_open_task = Realm::get_synchronized_realm(config); + auto async_open_progress = util::make_bind(logger, "async open non-streaming progress "); + async_open_task->register_download_progress_notifier(async_open_progress->make_cb()); + auto [promise, future] = util::make_promise_future(); + async_open_task->start( + [promise = std::move(promise)](ThreadSafeReference ref, std::exception_ptr ouch) mutable { + if (ouch) { + try { + std::rethrow_exception(ouch); + } + catch (...) { + promise.set_error(exception_to_status()); + } + } + else { + promise.emplace_value(std::move(ref)); + } + }); - auto config_1 = setup.make_config(); - auto realm_1 = Realm::get_shared_realm(config_1); - realm_1->sync_session()->pause(); + auto realm = Realm::get_shared_realm(std::move(future).get()); + auto noop_download_progress = util::make_bind(logger, "non-streaming download "); + auto noop_token = realm->sync_session()->register_progress_notifier( + noop_download_progress->make_cb(), SyncSession::ProgressDirection::download, false); + // The registration token for a non-streaming notifier that was expired at registration time + // is zero because it's invoked immediately and never registered for further notifications. + CHECK(noop_token == 0); + + auto async_open_entries = async_open_progress->wait_for_full_sync(); + REQUIRE_THAT(async_open_entries, ProgressIncreasesMatcher{}); + validate_noop_entry(noop_download_progress->wait_for_full_sync(), "noop_download_progress"); + } - expected_count = setup.add_objects(realm_1); - ReportedProgress progress_1; - add_callbacks(realm_1, progress_1); + SECTION("synchronous open then waiting for download then noop notification") { + { + auto fill_data_config = pbs_setup.make_config(); + auto fill_data_realm = Realm::get_shared_realm(fill_data_config); + pbs_setup.add_objects(fill_data_realm, 5); + wait_for_upload(*fill_data_realm); + } - wait_for_sync(realm_1); - VERIFY_PROGRESS_CONSISTENCY(progress_1, 0, 4, false); - progress_1.clear(); + auto config = pbs_setup.make_config(); + auto realm = Realm::get_shared_realm(config); + auto initial_progress = util::make_bind(logger, "streaming initial progress "); + realm->sync_session()->register_progress_notifier(initial_progress->make_cb(), NotifierType::download, true); - SECTION("progress from second realm") { - auto config2 = setup.make_config(); - auto realm_2 = Realm::get_shared_realm(config2); + auto initial_entries = initial_progress->wait_for_full_sync(); + REQUIRE(!initial_entries.empty()); + REQUIRE_THAT(initial_entries, ProgressIncreasesMatcher{}); - ReportedProgress progress_2; - add_callbacks(realm_2, progress_2); - wait_for_sync(realm_2); - VERIFY_REALM(realm_1, realm_2, expected_count); + auto noop_download_progress = util::make_bind(logger, "non-streaming noop download "); + auto noop_token = realm->sync_session()->register_progress_notifier( + noop_download_progress->make_cb(), SyncSession::ProgressDirection::download, false); + // The registration token for a non-streaming notifier that was expired at registration time + // is zero because it's invoked immediately and never registered for further notifications. + CHECK(noop_token == 0); - VERIFY_PROGRESS_CONSISTENCY(progress_2, 0, 4, true); - progress_2.clear(); + validate_noop_entry(noop_download_progress->wait_for_full_sync(), "noop_download_progress"); + } - VERIFY_PROGRESS_EMPTY(progress_1, 0, progress_1.size()); + SECTION("uploads") { + auto config = pbs_setup.make_config(); + auto realm = Realm::get_shared_realm(config); + auto initial_progress = util::make_bind(logger, "non-streaming initial progress "); - SECTION("continuous sync with existing instances") { - expected_count = setup.add_objects(realm_2); - add_callbacks(realm_2, progress_2); - wait_for_sync(realm_2); + pbs_setup.add_objects(realm, 5); - add_callbacks(realm_1, progress_1); - wait_for_sync(realm_1); - VERIFY_REALM(realm_1, realm_2, expected_count); + auto token = realm->sync_session()->register_progress_notifier(initial_progress->make_cb(), + NotifierType::upload, false); + auto initial_entries = initial_progress->wait_for_full_sync(); + REQUIRE(!initial_entries.empty()); + REQUIRE_THAT(initial_entries, ProgressIncreasesMatcher{}); + realm->sync_session()->unregister_progress_notifier(token); - // initially registered non-streaming callbacks should stay empty - VERIFY_PROGRESS_EMPTY(progress_1, 0, 2); - VERIFY_PROGRESS_EMPTY(progress_2, 0, 2); - // old streaming and newly registered should be reported - VERIFY_PROGRESS_CONSISTENCY(progress_1, 2, 8, true); - VERIFY_PROGRESS_CONSISTENCY(progress_2, 2, 8, false); - } + // it's possible that we've reached full synchronization in the progress notifier, but because + // of the way non-streaming notifiers work, the transferable may be higher for the next + // non-streaming notifier than for the one that just finished. So we explicitly wait for + // all uploads to complete to check that registering a noop notifier here is actually a noop. + wait_for_upload(*realm); - SECTION("reopen and sync existing realm") { - realm_2.reset(); - expected_count = setup.add_objects(realm_1); - wait_for_sync(realm_1); + auto noop_upload_progress = util::make_bind(logger, "non-streaming upload "); + auto noop_token = realm->sync_session()->register_progress_notifier( + noop_upload_progress->make_cb(), SyncSession::ProgressDirection::upload, false); + // The registration token for a non-streaming notifier that was expired at registration time + // is zero because it's invoked immediately and never registered for further notifications. + CHECK(noop_token == 0); - realm_2 = Realm::get_shared_realm(config2); - add_callbacks(realm_2, progress_2); - wait_for_sync(realm_2); - VERIFY_REALM(realm_1, realm_2, expected_count); + validate_noop_entry(noop_upload_progress->wait_for_full_sync(), "noop_upload_progress"); + } +} - VERIFY_PROGRESS_EMPTY(progress_1, 0, 2); - VERIFY_PROGRESS_CONSISTENCY(progress_1, 2, 4, false); - VERIFY_PROGRESS_EMPTY(progress_2, 0, 4); - VERIFY_PROGRESS_CONSISTENCY(progress_2, 4, 8, true); - } - progress_1.clear(); - progress_2.clear(); - } +TEMPLATE_TEST_CASE("sync progress: upload progress", "[sync][baas][progress]", PBS, FLX) +{ + TestType setup; - SECTION("progress through async open task on a new realm") { - auto config_3 = setup.make_config(); - ReportedProgress progress; + auto realm = Realm::get_shared_realm(setup.make_config()); + auto sync_session = realm->sync_session(); + auto logger = util::Logger::get_default_logger(); + auto non_streaming_progress = util::make_bind(logger, "non-streaming upload "); + auto streaming_progress = util::make_bind(logger, "streaming upload "); - // FIXME hits no_sessions assert in SyncManager due to issue with libuv scheduler and notifications - config_3.scheduler = util::Scheduler::make_dummy(); - config_3.automatic_change_notifications = false; + // There is a race between creating the objects and registering the non-streaming notifier + // since + sync_session->pause(); - // 0: open and sync fresh realm - should be equal to the realm_1 - // 1: add more objects to sync through realm_1 and try async open again - for (int i = 0; i < 2; ++i) { - auto task = Realm::get_synchronized_realm(config_3); - REQUIRE(task); + setup.add_objects(realm, 10); + sync_session->register_progress_notifier(non_streaming_progress->make_cb(), NotifierType::upload, false); + sync_session->register_progress_notifier(streaming_progress->make_cb(), NotifierType::upload, true); - auto progress_index = progress.size(); - progress.resize(progress.size() + 1); + sync_session->resume(); + wait_for_upload(*realm); - task->register_download_progress_notifier([&](uint64_t xferred, uint64_t xferable, double estimate) { - progress.add(progress_index, xferred, xferable, estimate); - }); + auto streaming_entries = streaming_progress->wait_for_full_sync(); + auto non_streaming_entries = non_streaming_progress->wait_for_full_sync(); - std::atomic finished = false; - ThreadSafeReference ref; - std::exception_ptr err = nullptr; - task->start([&](ThreadSafeReference r, std::exception_ptr e) { - ref = std::move(r); - err = e; - finished = true; - }); + REQUIRE(!streaming_entries.empty()); + REQUIRE(!non_streaming_entries.empty()); + REQUIRE_THAT(non_streaming_entries, ProgressIncreasesMatcher{}); + REQUIRE_THAT(streaming_entries, ProgressIncreasesMatcher{ProgressIncreasesMatcher::ByteCountOnly}); - util::EventLoop::main().run_until([&] { - return finished.load(); - }); + setup.add_objects(realm, 5); + wait_for_upload(*realm); - CHECK_FALSE(err); - REQUIRE(ref); - auto realm_3 = Realm::get_shared_realm(std::move(ref), util::Scheduler::make_dummy()); - VERIFY_REALM(realm_1, realm_3, expected_count); - realm_3.reset(); - - VERIFY_PROGRESS_CONSISTENCY_ONE(progress, progress_index, 1, true, false); - VERIFY_PROGRESS_EMPTY(progress, 0, progress_index); // previous (from i = 0) should be empty - progress.clear(); - - // add more objects through realm_1 and reopen existing realm on second iteration - if (i == 0) { - expected_count = setup.add_objects(realm_1); - add_callbacks(realm_1, progress_1); - wait_for_sync(realm_1); - VERIFY_PROGRESS_EMPTY(progress_1, 0, 2); - VERIFY_PROGRESS_CONSISTENCY(progress_1, 2, 8, false); - progress_1.clear(); - } - } - } + streaming_entries = streaming_progress->wait_for_full_sync(); + REQUIRE_THAT(streaming_entries, ProgressIncreasesMatcher{ProgressIncreasesMatcher::ByteCountOnly}); + REQUIRE(non_streaming_progress->empty()); } + #endif diff --git a/test/test_sync.cpp b/test/test_sync.cpp index fcf0810d309..6f23c2e79b0 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -3004,7 +3004,7 @@ TEST(Sync_UploadDownloadProgress_1) Session session = fixture.make_session(db, "/test"); auto progress_handler = [&](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, - uint_fast64_t uploadable, uint_fast64_t snapshot, double, double) { + uint_fast64_t uploadable, uint_fast64_t snapshot, double, double, int64_t) { downloaded_bytes = downloaded; downloadable_bytes = downloadable; uploaded_bytes = uploaded; @@ -3074,7 +3074,7 @@ TEST(Sync_UploadDownloadProgress_1) int number_of_handler_calls = 0; auto progress_handler = [&](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, - uint_fast64_t uploadable, uint_fast64_t snapshot, double, double) { + uint_fast64_t uploadable, uint_fast64_t snapshot, double, double, int64_t) { CHECK_EQUAL(downloaded, downloaded_bytes); CHECK_EQUAL(downloadable, downloaded_bytes); CHECK_EQUAL(uploaded, uploaded_bytes); @@ -3129,7 +3129,7 @@ TEST(Sync_UploadDownloadProgress_2) auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -3147,7 +3147,7 @@ TEST(Sync_UploadDownloadProgress_2) auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; @@ -3353,7 +3353,7 @@ TEST(Sync_UploadDownloadProgress_3) auto progress_handler = [&, entry = int(0), promise = util::CopyablePromiseHolder(std::move(signal_pf.promise))]( uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) mutable { + uint_fast64_t snapshot_version, double, double, int64_t) mutable { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -3465,7 +3465,7 @@ TEST(Sync_UploadDownloadProgress_4) auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_NOT_EQUAL(uploadable_bytes, 0); @@ -3511,7 +3511,7 @@ TEST(Sync_UploadDownloadProgress_4) auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); @@ -3565,7 +3565,7 @@ TEST(Sync_UploadDownloadProgress_5) auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) mutable { + uint_fast64_t snapshot_version, double, double, int64_t) mutable { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); @@ -3644,7 +3644,7 @@ TEST(Sync_UploadDownloadProgress_6) auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); @@ -3742,7 +3742,7 @@ TEST(Sync_UploadProgress_EmptyCommits) std::atomic entry = 0; session.set_progress_handler( - [&](uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, double, double) { + [&](uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, double, double, int64_t) { ++entry; }); session.bind(); @@ -4040,7 +4040,7 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32)) auto progress_handler_1 = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t, double, double) { + std::uint_fast64_t, double, double, int64_t) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -4054,7 +4054,7 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32)) auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, uint_fast64_t, double, - double) { + double, int64_t) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; @@ -4194,7 +4194,7 @@ TEST(Sync_MergeLargeBinaryReducedMemory) auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t /* snapshot_version */, double, double) { + uint_fast64_t /* snapshot_version */, double, double, int64_t) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -4208,7 +4208,7 @@ TEST(Sync_MergeLargeBinaryReducedMemory) auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t /* snapshot_version */, double, double) { + uint_fast64_t /* snapshot_version */, double, double, int64_t) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; @@ -4629,7 +4629,7 @@ TEST(Sync_BatchedUploadMessages) auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_GREATER(uploadable_bytes, 1000); // This is the important check. If the changesets were not batched, @@ -4681,7 +4681,7 @@ TEST(Sync_UploadLogCompactionEnabled) auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t snapshot_version, double, double) { + uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); @@ -4739,7 +4739,7 @@ TEST(Sync_UploadLogCompactionDisabled) auto progress_handler = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t snapshot_version, double, double) { + std::uint_fast64_t snapshot_version, double, double, int64_t) { CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes);