diff --git a/CHANGELOG.md b/CHANGELOG.md index f5f82426498..2342a564f8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Fixed * SyncManager had some inconsistent locking which could result in data races and/or deadlocks, mostly in ways that would never be hit outside of tests doing very strange things (since v10.0.0). * Reduce the peak memory usage of changeset uploading by eliminating an extra copy of each changeset which was held in memory. +* Streaming download notifiers reported incorrect values for transferrable bytes (since 11.5.2). ### Breaking changes * None. diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 383a378b6cb..bfdf9e3f323 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -977,40 +977,24 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version) std::function SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired) { - uint64_t transferrable; - if (is_streaming) { - transferrable = is_download ? current_progress.downloadable : current_progress.uploadable; - } - else if (captured_transferrable) { - if (is_download && (current_progress.downloadable == 0)) { - captured_transferrable = 0; - } - transferrable = *captured_transferrable; - } - else { - if (is_download) { - // In protocol versions 25 and earlier, downloadable_bytes was the total - // size of the history. From protocol version 26, downloadable_bytes - // represent the non-downloaded bytes on the server. Since the user supplied - // progress handler interprets downloadable_bytes as the total size of - // downloadable bytes, this number must be calculated. We could change the - // meaning of downloadable_bytes for the progress handler, but that would be - // a breaking change. Note that protocol version 25 (and earlier) is no - // longer supported by clients. - captured_transferrable = current_progress.downloaded + current_progress.downloadable; - } - else { - // If the sync client has not yet processed all of the local - // transactions then the uploadable data is incorrect and we should - // not invoke the callback - if (snapshot_version > current_progress.snapshot_version) - return [] {}; - captured_transferrable = current_progress.uploadable; - } + uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded; + uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable; + if (!is_streaming) { + // If the sync client has not yet processed all of the local + // transactions then the uploadable data is incorrect and we should + // not invoke the callback + if (!is_download && snapshot_version > current_progress.snapshot_version) + return [] {}; + + // The initial download size we get from the server is the uncompacted + // size, and so the download may complete before we actually receive + // that much data. When that happens, transferrable will drop and we + // need to use the new value instead of the captured one. + if (!captured_transferrable || *captured_transferrable > transferrable) + captured_transferrable = transferrable; transferrable = *captured_transferrable; } - uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded; // A notifier is expired if at least as many bytes have been transferred // as were originally considered transferrable. is_expired = !is_streaming && transferred >= transferrable; diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index e1735f31264..1c675bfcad0 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1226,17 +1226,22 @@ void SessionWrapper::report_progress() ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, snapshot_version); + // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes + // is only the remaining to download. This is confusing, so make them use + // the same units. + std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes; + m_sess->logger.debug("Progress handler called, downloaded = %1, " "downloadable(total) = %2, uploaded = %3, " "uploadable = %4, reliable_download_progress = %5, " "snapshot version = %6", - downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, + downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, m_reliable_download_progress, snapshot_version); - // Ignore progress messages from before we first receive a DOWNLOAD message. - // FIXME: This could be a bool instead of std::uint_fast64_t - std::uint_fast64_t download_version = (m_reliable_download_progress ? 1 : 0); - m_progress_handler(downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, download_version, + // FIXME: Why is this boolean status communicated to the application as + // a 64-bit integer? Also, the name `progress_version` is confusing. + std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0); + m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version, snapshot_version); } diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index 70f92bc40df..9a95f70d9fa 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -167,7 +167,7 @@ class Session { using SyncTransactCallback = void(VersionID old_version, VersionID new_version); using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t download_version, std::uint_fast64_t snapshot_version); + std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version); using WaitOperCompletionHandler = std::function; using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data, size_t pem_size, int preverify_ok, int depth); diff --git a/test/object-store/sync/session/progress_notifications.cpp b/test/object-store/sync/session/progress_notifications.cpp index 5148327b9a9..5f5889e6958 100644 --- a/test/object-store/sync/session/progress_notifications.cpp +++ b/test/object-store/sync/session/progress_notifications.cpp @@ -331,7 +331,7 @@ TEST_CASE("progress notification", "[sync]") { // Prime the progress updater current_transferred = 60; current_transferrable = 501; - uint64_t original_transferrable = current_transferrable; + const uint64_t original_transferrable = current_transferrable; progress.update(current_transferred, current_transferrable, 21, 26, 1, 1); progress.register_callback( @@ -346,7 +346,6 @@ TEST_CASE("progress notification", "[sync]") { // Now manually call the notifier handler a few times. callback_was_called = false; - original_transferrable = current_transferred + current_transferrable; current_transferred = 66; current_transferrable = 582; progress.update(current_transferred, current_transferrable, 25, 26, 1, 1); @@ -394,7 +393,7 @@ TEST_CASE("progress notification", "[sync]") { // Next we get a DOWNLOAD message telling us there's more to download progress.update(current_transferred, current_transferrable, 0, 0, 1, 1); REQUIRE(callback_was_called); - REQUIRE(transferrable == (current_transferred + current_transferrable)); + REQUIRE(current_transferrable == transferrable); REQUIRE(current_transferred == transferred); current_transferred = 200; @@ -414,7 +413,7 @@ TEST_CASE("progress notification", "[sync]") { NotifierType::download, false); REQUIRE(callback_was_called); - REQUIRE(transferrable == (current_transferred + current_transferrable)); + REQUIRE(current_transferrable == transferrable); REQUIRE(current_transferred == transferred); } @@ -462,7 +461,7 @@ TEST_CASE("progress notification", "[sync]") { uint64_t current_downloaded = 68; uint64_t current_downloadable = 182; const uint64_t original_uploadable = current_uploadable; - uint64_t original_downloadable = current_downloadable; + const uint64_t original_downloadable = current_downloadable; progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); progress.register_callback( @@ -490,7 +489,6 @@ TEST_CASE("progress notification", "[sync]") { // Now manually call the notifier handler a few times. callback_was_called = false; callback_was_called_2 = false; - original_downloadable = current_downloaded + current_downloadable; current_uploaded = 36; current_uploadable = 310; current_downloaded = 171; @@ -524,12 +522,12 @@ TEST_CASE("progress notification", "[sync]") { current_uploaded = 218; current_uploadable = 310; current_downloaded = 182; - current_downloadable = 0; + current_downloadable = 196; progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); CHECK(!callback_was_called); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); - CHECK(downloadable == 0); + CHECK(downloadable == original_downloadable); // Fourth callback, last one for the download notifier callback_was_called_2 = false; @@ -546,9 +544,9 @@ TEST_CASE("progress notification", "[sync]") { // Prime the progress updater uint64_t current_uploaded = 16; uint64_t current_uploadable = 201; - uint64_t current_downloaded = 0; - uint64_t current_downloadable = 250; - uint64_t original_downloadable = current_downloadable; + uint64_t current_downloaded = 68; + uint64_t current_downloadable = 182; + const uint64_t original_downloadable = current_downloadable; progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); progress.register_callback( @@ -561,12 +559,11 @@ TEST_CASE("progress notification", "[sync]") { REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. - original_downloadable = current_downloaded + current_downloadable; callback_was_called = false; current_uploaded = 36; current_uploadable = 310; current_downloaded = 171; - current_downloadable = 79; + current_downloadable = 185; progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); CHECK(callback_was_called); CHECK(transferred == current_downloaded); @@ -576,6 +573,7 @@ TEST_CASE("progress notification", "[sync]") { bool callback_was_called_2 = false; uint64_t downloaded = 0; uint64_t downloadable = 0; + const uint64_t original_downloadable_2 = current_downloadable; progress.register_callback( [&](auto xferred, auto xferable) { downloaded = xferred; @@ -591,27 +589,91 @@ TEST_CASE("progress notification", "[sync]") { callback_was_called_2 = false; current_uploaded = 36; current_uploadable = 310; - current_downloaded = 250; - current_downloadable = 50; + current_downloaded = 182; + current_downloadable = 190; progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); CHECK(callback_was_called); CHECK(transferred == current_downloaded); - CHECK(transferrable == 250); + CHECK(transferrable == original_downloadable); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); - CHECK(downloadable == 250); + CHECK(downloadable == original_downloadable_2); // Third callback, last one for second notifier callback_was_called = false; callback_was_called_2 = false; current_uploaded = 36; current_uploadable = 310; - current_downloaded = 250; - current_downloadable = 0; + current_downloaded = 189; + current_downloadable = 250; progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); CHECK(!callback_was_called); - CHECK(!callback_was_called_2); + CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); + CHECK(downloadable == original_downloadable_2); + + // Fourth callback + callback_was_called_2 = false; + current_uploaded = 36; + current_uploadable = 310; + current_downloaded = 201; + current_downloadable = 289; + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + CHECK(!callback_was_called_2); + } + + SECTION("download notifiers handle transferrable decreasing") { + // Prime the progress updater + current_transferred = 60; + current_transferrable = 501; + const uint64_t original_transferrable = current_transferrable; + progress.update(current_transferred, current_transferrable, 21, 26, 1, 1); + + progress.register_callback( + [&](auto xferred, auto xferable) { + transferred = xferred; + transferrable = xferable; + callback_was_called = true; + }, + NotifierType::download, false); + // Wait for the initial callback. + REQUIRE(callback_was_called); + + // Download some data but also drop the total. transferrable should + // update because it decreased. + callback_was_called = false; + current_transferred = 160; + current_transferrable = 451; + progress.update(current_transferred, current_transferrable, 25, 26, 1, 1); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == current_transferrable); + + // Increasing current_transferrable should not increase transferrable + const uint64_t previous_transferrable = current_transferrable; + callback_was_called = false; + current_transferrable = 1000; + progress.update(current_transferred, current_transferrable, 68, 191, 1, 1); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == previous_transferrable); + + // Transferrable dropping to be equal to transferred should notify + // and then expire the notifier + callback_was_called = false; + current_transferred = 200; + current_transferrable = current_transferred; + progress.update(current_transferred, current_transferrable, 191, 192, 1, 1); + CHECK(callback_was_called); + CHECK(transferred == current_transferred); + CHECK(transferrable == current_transferred); + + // The notifier should be unregistered at this point, and not fire. + callback_was_called = false; + current_transferred = original_transferrable + 250; + current_transferrable = 1228; + progress.update(current_transferred, current_transferrable, 199, 591, 1, 1); + CHECK(!callback_was_called); } } } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 0b5fe8a9aa5..06971d5948d 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -4214,6 +4214,7 @@ TEST(Sync_UploadDownloadProgress_2) CHECK_EQUAL(downloadable_bytes_1, 0); CHECK_NOT_EQUAL(downloaded_bytes_2, 0); + CHECK_NOT_EQUAL(downloadable_bytes_2, 0); CHECK_NOT_EQUAL(uploaded_bytes_1, 0); CHECK_NOT_EQUAL(uploadable_bytes_1, 0); @@ -4245,8 +4246,10 @@ TEST(Sync_UploadDownloadProgress_2) session_2.wait_for_download_complete_or_client_stopped(); CHECK_NOT_EQUAL(downloaded_bytes_1, 0); + CHECK_NOT_EQUAL(downloadable_bytes_1, 0); CHECK_NOT_EQUAL(downloaded_bytes_2, 0); + CHECK_NOT_EQUAL(downloadable_bytes_2, 0); CHECK_NOT_EQUAL(uploaded_bytes_1, 0); CHECK_NOT_EQUAL(uploadable_bytes_1, 0); @@ -4272,12 +4275,12 @@ TEST(Sync_UploadDownloadProgress_2) session_1.wait_for_download_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); - CHECK_EQUAL(downloadable_bytes_1, 0); + CHECK_EQUAL(downloaded_bytes_1, downloadable_bytes_1); // uncertainty due to merge CHECK_NOT_EQUAL(downloaded_bytes_1, 0); - CHECK_EQUAL(downloadable_bytes_2, 0); + CHECK_EQUAL(downloaded_bytes_2, downloadable_bytes_2); CHECK_NOT_EQUAL(downloaded_bytes_2, 0); CHECK_NOT_EQUAL(uploaded_bytes_1, 0); @@ -4558,7 +4561,7 @@ TEST(Sync_UploadDownloadProgress_4) else if (entry_2 == 2) { CHECK_GREATER(progress_version, 0); CHECK_NOT_EQUAL(downloaded_bytes, 0); - CHECK_EQUAL(downloadable_bytes, 0); + CHECK_NOT_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(snapshot_version, 4); } @@ -4999,11 +5002,11 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32)) CHECK((cb.size() == binary_sizes[3] && cb[0] == 'd') || (cb.size() == binary_sizes[7] && cb[0] == 'h')); } - CHECK_EQUAL(downloadable_bytes_1, 0); + CHECK_EQUAL(downloadable_bytes_1, downloaded_bytes_1); CHECK_EQUAL(uploadable_bytes_1, uploaded_bytes_1); CHECK_NOT_EQUAL(uploaded_bytes_1, 0); - CHECK_EQUAL(downloadable_bytes_2, 0); + CHECK_EQUAL(downloadable_bytes_2, downloaded_bytes_2); CHECK_EQUAL(uploadable_bytes_2, uploaded_bytes_2); CHECK_NOT_EQUAL(uploaded_bytes_2, 0); @@ -5153,11 +5156,11 @@ TEST(Sync_MergeLargeBinaryReducedMemory) CHECK((cb.size() == binary_sizes[3] && cb[0] == 'd') || (cb.size() == binary_sizes[7] && cb[0] == 'h')); } - CHECK_EQUAL(downloadable_bytes_1, 0); + CHECK_EQUAL(downloadable_bytes_1, downloaded_bytes_1); CHECK_EQUAL(uploadable_bytes_1, uploaded_bytes_1); CHECK_NOT_EQUAL(uploaded_bytes_1, 0); - CHECK_EQUAL(downloadable_bytes_2, 0); + CHECK_EQUAL(downloadable_bytes_2, downloaded_bytes_2); CHECK_EQUAL(uploadable_bytes_2, uploaded_bytes_2); CHECK_NOT_EQUAL(uploaded_bytes_2, 0); @@ -5488,10 +5491,15 @@ TEST(Sync_UploadLogCompactionEnabled) fixture.bind_session(session_1, "/test"); session_1.wait_for_upload_complete_or_client_stopped(); - auto progress_handler = [&](uint_fast64_t, uint_fast64_t, uint_fast64_t uploaded_bytes, - uint_fast64_t uploadable_bytes, uint_fast64_t, uint_fast64_t) { + auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, + uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); + static_cast(snapshot_version); + if (progress_version > 0) + CHECK_NOT_EQUAL(downloadable_bytes, 0); }; session_2.set_progress_handler(progress_handler); @@ -5542,10 +5550,15 @@ TEST(Sync_UploadLogCompactionDisabled) Session session_1 = fixture.make_bound_session(db_1, "/test"); session_1.wait_for_upload_complete_or_client_stopped(); - auto progress_handler = [&](std::uint_fast64_t, std::uint_fast64_t, std::uint_fast64_t uploaded_bytes, - std::uint_fast64_t uploadable_bytes, std::uint_fast64_t, std::uint_fast64_t) { + auto progress_handler = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, + std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, + std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version) { + CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); + static_cast(snapshot_version); + if (progress_version > 0) + CHECK_NOT_EQUAL(0, downloadable_bytes); }; Session session_2 = fixture.make_session(db_2);