Skip to content

Commit

Permalink
Handle downloadable_bytes decreasing for non-streaming notifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed Oct 29, 2021
1 parent 36e1884 commit 0ff0059
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### Fixed
* SyncManager had some inconsistent locking which could result in data races and/or deadlocks, mostly in ways that would never be hit outside of tests doing very strange things (since v10.0.0).
* Reduce the peak memory usage of changeset uploading by eliminating an extra copy of each changeset which was held in memory.
* Streaming download notifiers reported incorrect values for transferrable bytes (since 11.5.2).

### Breaking changes
* None.
Expand Down
34 changes: 15 additions & 19 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,28 +977,24 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
std::function<void()> SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress,
bool& is_expired)
{
uint64_t transferrable;
if (is_streaming) {
transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
}
else if (captured_transferrable) {
transferrable = *captured_transferrable;
}
else {
if (is_download)
captured_transferrable = current_progress.downloadable;
else {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (snapshot_version > current_progress.snapshot_version)
return [] {};
captured_transferrable = current_progress.uploadable;
}
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
if (!is_streaming) {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};

// The initial download size we get from the server is an estimate
// and it may decrease once compaction is performed, so we need to
// lower captured_transferrable when that happens. We never want to raise
// it due to new data being added, though.
if (!captured_transferrable || *captured_transferrable > transferrable)
captured_transferrable = transferrable;
transferrable = *captured_transferrable;
}

uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
// A notifier is expired if at least as many bytes have been transferred
// as were originally considered transferrable.
is_expired = !is_streaming && transferred >= transferrable;
Expand Down
54 changes: 54 additions & 0 deletions test/object-store/sync/session/progress_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,5 +621,59 @@ TEST_CASE("progress notification", "[sync]") {
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called_2);
}

SECTION("download notifiers handle transferrable decreasing") {
// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
const uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
[&](auto xferred, auto xferable) {
transferred = xferred;
transferrable = xferable;
callback_was_called = true;
},
NotifierType::download, false);
// Wait for the initial callback.
REQUIRE(callback_was_called);

// Download some data but also drop the total. transferrable should
// update because it decreased.
callback_was_called = false;
current_transferred = 160;
current_transferrable = 451;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == current_transferrable);

// Increasing current_transferrable should not increase transferrable
const uint64_t previous_transferrable = current_transferrable;
callback_was_called = false;
current_transferrable = 1000;
progress.update(current_transferred, current_transferrable, 68, 191, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == previous_transferrable);

// Transferrable dropping to be equal to transferred should notify
// and then expire the notifier
callback_was_called = false;
current_transferred = 200;
current_transferrable = current_transferred;
progress.update(current_transferred, current_transferrable, 191, 192, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == current_transferred);

// The notifier should be unregistered at this point, and not fire.
callback_was_called = false;
current_transferred = original_transferrable + 250;
current_transferrable = 1228;
progress.update(current_transferred, current_transferrable, 199, 591, 1, 1);
CHECK(!callback_was_called);
}
}
}

0 comments on commit 0ff0059

Please sign in to comment.