Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix streaming download notifiers #5008

Merged
merged 3 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### Fixed
* SyncManager had some inconsistent locking which could result in data races and/or deadlocks, mostly in ways that would never be hit outside of tests doing very strange things (since v10.0.0).
* Reduce the peak memory usage of changeset uploading by eliminating an extra copy of each changeset which was held in memory.
* Streaming download notifiers reported incorrect values for transferrable bytes (since 11.5.2).

### Breaking changes
* None.
Expand Down
46 changes: 15 additions & 31 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,40 +977,24 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
std::function<void()> SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress,
bool& is_expired)
{
uint64_t transferrable;
if (is_streaming) {
transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
}
else if (captured_transferrable) {
if (is_download && (current_progress.downloadable == 0)) {
captured_transferrable = 0;
}
transferrable = *captured_transferrable;
}
else {
if (is_download) {
// In protocol versions 25 and earlier, downloadable_bytes was the total
// size of the history. From protocol version 26, downloadable_bytes
// represent the non-downloaded bytes on the server. Since the user supplied
// progress handler interprets downloadable_bytes as the total size of
// downloadable bytes, this number must be calculated. We could change the
// meaning of downloadable_bytes for the progress handler, but that would be
// a breaking change. Note that protocol version 25 (and earlier) is no
// longer supported by clients.
captured_transferrable = current_progress.downloaded + current_progress.downloadable;
}
else {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (snapshot_version > current_progress.snapshot_version)
return [] {};
captured_transferrable = current_progress.uploadable;
}
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
if (!is_streaming) {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};

// The initial download size we get from the server is the uncompacted
// size, and so the download may complete before we actually receive
// that much data. When that happens, transferrable will drop and we
// need to use the new value instead of the captured one.
if (!captured_transferrable || *captured_transferrable > transferrable)
captured_transferrable = transferrable;
transferrable = *captured_transferrable;
}

uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
// A notifier is expired if at least as many bytes have been transferred
// as were originally considered transferrable.
is_expired = !is_streaming && transferred >= transferrable;
Expand Down
15 changes: 10 additions & 5 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,17 +1226,22 @@ void SessionWrapper::report_progress()
ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
uploadable_bytes, snapshot_version);

// uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
// is only the remaining to download. This is confusing, so make them use
// the same units.
std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike that we're layering this information so that something that's higher in the call stack of the same statically linked library is unaware of the meaning of the information being passed to it. I think this adds complexity for no reason and makes this confusing and error prone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly think that this is the right place to do this, but only because I think it should be done even earlier. The exact details of how progress is represented in the protocol are not relevant to anything which isn't interacting directly with the protocol, which includes most of the sync client.


m_sess->logger.debug("Progress handler called, downloaded = %1, "
"downloadable(total) = %2, uploaded = %3, "
"uploadable = %4, reliable_download_progress = %5, "
"snapshot version = %6",
downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes,
downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
m_reliable_download_progress, snapshot_version);

// Ignore progress messages from before we first receive a DOWNLOAD message.
// FIXME: This could be a bool instead of std::uint_fast64_t
std::uint_fast64_t download_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, download_version,
// FIXME: Why is this boolean status communicated to the application as
// a 64-bit integer? Also, the name `progress_version` is confusing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change everything in this PR, but we def don't need to make existing passive aggressive comments into outwardly aggressive comments. Either fix this or just remove the comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pre-existing code which I didn't touch.

std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
snapshot_version);
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Session {
using SyncTransactCallback = void(VersionID old_version, VersionID new_version);
using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t download_version, std::uint_fast64_t snapshot_version);
std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version);
using WaitOperCompletionHandler = std::function<void(std::error_code)>;
using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data,
size_t pem_size, int preverify_ok, int depth);
Expand Down
102 changes: 82 additions & 20 deletions test/object-store/sync/session/progress_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
uint64_t original_transferrable = current_transferrable;
const uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
Expand All @@ -346,7 +346,6 @@ TEST_CASE("progress notification", "[sync]") {

// Now manually call the notifier handler a few times.
callback_was_called = false;
original_transferrable = current_transferred + current_transferrable;
current_transferred = 66;
current_transferrable = 582;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
Expand Down Expand Up @@ -394,7 +393,7 @@ TEST_CASE("progress notification", "[sync]") {
// Next we get a DOWNLOAD message telling us there's more to download
progress.update(current_transferred, current_transferrable, 0, 0, 1, 1);
REQUIRE(callback_was_called);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferrable == transferrable);
REQUIRE(current_transferred == transferred);

current_transferred = 200;
Expand All @@ -414,7 +413,7 @@ TEST_CASE("progress notification", "[sync]") {
NotifierType::download, false);

REQUIRE(callback_was_called);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferrable == transferrable);
REQUIRE(current_transferred == transferred);
}

Expand Down Expand Up @@ -462,7 +461,7 @@ TEST_CASE("progress notification", "[sync]") {
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_uploadable = current_uploadable;
uint64_t original_downloadable = current_downloadable;
const uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand Down Expand Up @@ -490,7 +489,6 @@ TEST_CASE("progress notification", "[sync]") {
// Now manually call the notifier handler a few times.
callback_was_called = false;
callback_was_called_2 = false;
original_downloadable = current_downloaded + current_downloadable;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
Expand Down Expand Up @@ -524,12 +522,12 @@ TEST_CASE("progress notification", "[sync]") {
current_uploaded = 218;
current_uploadable = 310;
current_downloaded = 182;
current_downloadable = 0;
current_downloadable = 196;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == 0);
CHECK(downloadable == original_downloadable);

// Fourth callback, last one for the download notifier
callback_was_called_2 = false;
Expand All @@ -546,9 +544,9 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
uint64_t current_uploaded = 16;
uint64_t current_uploadable = 201;
uint64_t current_downloaded = 0;
uint64_t current_downloadable = 250;
uint64_t original_downloadable = current_downloadable;
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand All @@ -561,12 +559,11 @@ TEST_CASE("progress notification", "[sync]") {
REQUIRE(callback_was_called);

// Now manually call the notifier handler a few times.
original_downloadable = current_downloaded + current_downloadable;
callback_was_called = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
current_downloadable = 79;
current_downloadable = 185;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
Expand All @@ -576,6 +573,7 @@ TEST_CASE("progress notification", "[sync]") {
bool callback_was_called_2 = false;
uint64_t downloaded = 0;
uint64_t downloadable = 0;
const uint64_t original_downloadable_2 = current_downloadable;
progress.register_callback(
[&](auto xferred, auto xferable) {
downloaded = xferred;
Expand All @@ -591,27 +589,91 @@ TEST_CASE("progress notification", "[sync]") {
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 250;
current_downloadable = 50;
current_downloaded = 182;
current_downloadable = 190;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
CHECK(transferrable == 250);
CHECK(transferrable == original_downloadable);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == 250);
CHECK(downloadable == original_downloadable_2);

// Third callback, last one for second notifier
callback_was_called = false;
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 250;
current_downloadable = 0;
current_downloaded = 189;
current_downloadable = 250;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(!callback_was_called_2);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable_2);

// Fourth callback
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 201;
current_downloadable = 289;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called_2);
}

SECTION("download notifiers handle transferrable decreasing") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure that there's a test case that would have caught this issue and that it somehow slipped through CR, but could you point out to me where it is in this file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests did catch that there was a functional change, and then the tests were modified to pass.

// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
const uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
[&](auto xferred, auto xferable) {
transferred = xferred;
transferrable = xferable;
callback_was_called = true;
},
NotifierType::download, false);
// Wait for the initial callback.
REQUIRE(callback_was_called);

// Download some data but also drop the total. transferrable should
// update because it decreased.
callback_was_called = false;
current_transferred = 160;
current_transferrable = 451;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == current_transferrable);

// Increasing current_transferrable should not increase transferrable
const uint64_t previous_transferrable = current_transferrable;
callback_was_called = false;
current_transferrable = 1000;
progress.update(current_transferred, current_transferrable, 68, 191, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == previous_transferrable);

// Transferrable dropping to be equal to transferred should notify
// and then expire the notifier
callback_was_called = false;
current_transferred = 200;
current_transferrable = current_transferred;
progress.update(current_transferred, current_transferrable, 191, 192, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_transferred);
CHECK(transferrable == current_transferred);

// The notifier should be unregistered at this point, and not fire.
callback_was_called = false;
current_transferred = original_transferrable + 250;
current_transferrable = 1228;
progress.update(current_transferred, current_transferrable, 199, 591, 1, 1);
CHECK(!callback_was_called);
}
}
}
Loading