Skip to content

Commit

Permalink
Merge pull request #5008 from realm/tg/streaming-notifier
Browse files Browse the repository at this point in the history
Fix streaming download notifiers
  • Loading branch information
tgoyne authored Nov 1, 2021
2 parents 7bc8c79 + 094db34 commit 74f26c3
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### Fixed
* SyncManager had some inconsistent locking which could result in data races and/or deadlocks, mostly in ways that would never be hit outside of tests doing very strange things (since v10.0.0).
* Reduce the peak memory usage of changeset uploading by eliminating an extra copy of each changeset which was held in memory.
* Streaming download notifiers reported incorrect values for transferrable bytes (since 11.5.2).

### Breaking changes
* None.
Expand Down
46 changes: 15 additions & 31 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,40 +977,24 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
std::function<void()> SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress,
bool& is_expired)
{
uint64_t transferrable;
if (is_streaming) {
transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
}
else if (captured_transferrable) {
if (is_download && (current_progress.downloadable == 0)) {
captured_transferrable = 0;
}
transferrable = *captured_transferrable;
}
else {
if (is_download) {
// In protocol versions 25 and earlier, downloadable_bytes was the total
// size of the history. From protocol version 26, downloadable_bytes
// represent the non-downloaded bytes on the server. Since the user supplied
// progress handler interprets downloadable_bytes as the total size of
// downloadable bytes, this number must be calculated. We could change the
// meaning of downloadable_bytes for the progress handler, but that would be
// a breaking change. Note that protocol version 25 (and earlier) is no
// longer supported by clients.
captured_transferrable = current_progress.downloaded + current_progress.downloadable;
}
else {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (snapshot_version > current_progress.snapshot_version)
return [] {};
captured_transferrable = current_progress.uploadable;
}
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
if (!is_streaming) {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};

// The initial download size we get from the server is the uncompacted
// size, and so the download may complete before we actually receive
// that much data. When that happens, transferrable will drop and we
// need to use the new value instead of the captured one.
if (!captured_transferrable || *captured_transferrable > transferrable)
captured_transferrable = transferrable;
transferrable = *captured_transferrable;
}

uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
// A notifier is expired if at least as many bytes have been transferred
// as were originally considered transferrable.
is_expired = !is_streaming && transferred >= transferrable;
Expand Down
15 changes: 10 additions & 5 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,17 +1226,22 @@ void SessionWrapper::report_progress()
ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
uploadable_bytes, snapshot_version);

// uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
// is only the remaining to download. This is confusing, so make them use
// the same units.
std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;

m_sess->logger.debug("Progress handler called, downloaded = %1, "
"downloadable(total) = %2, uploaded = %3, "
"uploadable = %4, reliable_download_progress = %5, "
"snapshot version = %6",
downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes,
downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
m_reliable_download_progress, snapshot_version);

// Ignore progress messages from before we first receive a DOWNLOAD message.
// FIXME: This could be a bool instead of std::uint_fast64_t
std::uint_fast64_t download_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, download_version,
// FIXME: Why is this boolean status communicated to the application as
// a 64-bit integer? Also, the name `progress_version` is confusing.
std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
snapshot_version);
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Session {
using SyncTransactCallback = void(VersionID old_version, VersionID new_version);
using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t download_version, std::uint_fast64_t snapshot_version);
std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version);
using WaitOperCompletionHandler = std::function<void(std::error_code)>;
using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data,
size_t pem_size, int preverify_ok, int depth);
Expand Down
102 changes: 82 additions & 20 deletions test/object-store/sync/session/progress_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
uint64_t original_transferrable = current_transferrable;
const uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
Expand All @@ -346,7 +346,6 @@ TEST_CASE("progress notification", "[sync]") {

// Now manually call the notifier handler a few times.
callback_was_called = false;
original_transferrable = current_transferred + current_transferrable;
current_transferred = 66;
current_transferrable = 582;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
Expand Down Expand Up @@ -394,7 +393,7 @@ TEST_CASE("progress notification", "[sync]") {
// Next we get a DOWNLOAD message telling us there's more to download
progress.update(current_transferred, current_transferrable, 0, 0, 1, 1);
REQUIRE(callback_was_called);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferrable == transferrable);
REQUIRE(current_transferred == transferred);

current_transferred = 200;
Expand All @@ -414,7 +413,7 @@ TEST_CASE("progress notification", "[sync]") {
NotifierType::download, false);

REQUIRE(callback_was_called);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferrable == transferrable);
REQUIRE(current_transferred == transferred);
}

Expand Down Expand Up @@ -462,7 +461,7 @@ TEST_CASE("progress notification", "[sync]") {
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_uploadable = current_uploadable;
uint64_t original_downloadable = current_downloadable;
const uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand Down Expand Up @@ -490,7 +489,6 @@ TEST_CASE("progress notification", "[sync]") {
// Now manually call the notifier handler a few times.
callback_was_called = false;
callback_was_called_2 = false;
original_downloadable = current_downloaded + current_downloadable;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
Expand Down Expand Up @@ -524,12 +522,12 @@ TEST_CASE("progress notification", "[sync]") {
current_uploaded = 218;
current_uploadable = 310;
current_downloaded = 182;
current_downloadable = 0;
current_downloadable = 196;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == 0);
CHECK(downloadable == original_downloadable);

// Fourth callback, last one for the download notifier
callback_was_called_2 = false;
Expand All @@ -546,9 +544,9 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
uint64_t current_uploaded = 16;
uint64_t current_uploadable = 201;
uint64_t current_downloaded = 0;
uint64_t current_downloadable = 250;
uint64_t original_downloadable = current_downloadable;
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand All @@ -561,12 +559,11 @@ TEST_CASE("progress notification", "[sync]") {
REQUIRE(callback_was_called);

// Now manually call the notifier handler a few times.
original_downloadable = current_downloaded + current_downloadable;
callback_was_called = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
current_downloadable = 79;
current_downloadable = 185;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
Expand All @@ -576,6 +573,7 @@ TEST_CASE("progress notification", "[sync]") {
bool callback_was_called_2 = false;
uint64_t downloaded = 0;
uint64_t downloadable = 0;
const uint64_t original_downloadable_2 = current_downloadable;
progress.register_callback(
[&](auto xferred, auto xferable) {
downloaded = xferred;
Expand All @@ -591,27 +589,91 @@ TEST_CASE("progress notification", "[sync]") {
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 250;
current_downloadable = 50;
current_downloaded = 182;
current_downloadable = 190;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
CHECK(transferrable == 250);
CHECK(transferrable == original_downloadable);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == 250);
CHECK(downloadable == original_downloadable_2);

// Third callback, last one for second notifier
callback_was_called = false;
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 250;
current_downloadable = 0;
current_downloaded = 189;
current_downloadable = 250;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(!callback_was_called_2);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable_2);

// Fourth callback
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 201;
current_downloadable = 289;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called_2);
}

SECTION("download notifiers handle transferrable decreasing") {
// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
const uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
[&](auto xferred, auto xferable) {
transferred = xferred;
transferrable = xferable;
callback_was_called = true;
},
NotifierType::download, false);
// Wait for the initial callback.
REQUIRE(callback_was_called);

// Download some data but also drop the total. transferrable should
// update because it decreased.
callback_was_called = false;
current_transferred = 160;
current_transferrable = 451;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == current_transferrable);

// Increasing current_transferrable should not increase transferrable
const uint64_t previous_transferrable = current_transferrable;
callback_was_called = false;
current_transferrable = 1000;
progress.update(current_transferred, current_transferrable, 68, 191, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == previous_transferrable);

// Transferrable dropping to be equal to transferred should notify
// and then expire the notifier
callback_was_called = false;
current_transferred = 200;
current_transferrable = current_transferred;
progress.update(current_transferred, current_transferrable, 191, 192, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == current_transferred);

// The notifier should be unregistered at this point, and not fire.
callback_was_called = false;
current_transferred = original_transferrable + 250;
current_transferrable = 1228;
progress.update(current_transferred, current_transferrable, 199, 591, 1, 1);
CHECK(!callback_was_called);
}
}
}
Loading

0 comments on commit 74f26c3

Please sign in to comment.