Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update captured_transferrable when downloadable bytes value changes. #4989

Merged
merged 5 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Calling `size()` on a Results newly constructed via `.as_results().distinct()` on a Collection would give the size of the Collection rather than the distinct count. ([Cocoa #7481](https://github.com/realm/realm-cocoa/issues/7481), since v11.0.0).
* Calling `clear()` on a Results newly constructed via `.as_results().distinct()` on a Collection would delete all objects in the Collection rather than just the distinct objects in the Results (since v11.0.0).
* Calling `clear()` on a Results constructed via `.as_results().distinct()` on a Collection after calling `get()` or `size()` would not re-evaluate the distinct until after the next mutation to the table occurred.
* Sync progress notifiers would not trigger when the downloadable bytes size would equal 0.

### Breaking changes
* `App::Config::transport_factory` was replaced with `App::Config::transport`. It should now be an instance of `GenericNetworkTransport` rather than a factory for making instances. This allows the SDK to control which thread constructs the transport layer. ([#4903](https://github.com/realm/realm-core/pull/4903))
Expand Down
16 changes: 14 additions & 2 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1125,11 +1125,23 @@ std::function<void()> SyncProgressNotifier::NotifierPackage::create_invocation(P
transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
}
else if (captured_transferrable) {
if (is_download && (current_progress.downloadable == 0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this we will never be able to pass 0 downloadable bytes to the SDK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but do we want to pass 0 to the SDK? We didn't used to. We just want this to expire when downloadable reaches zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes because that is the only thing we can do that does not involve removing captured_transferrable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was to just expire the SDK will not be notified of it.

captured_transferrable = 0;
}
transferrable = *captured_transferrable;
}
else {
if (is_download)
captured_transferrable = current_progress.downloadable;
if (is_download) {
// In protocol versions 25 and earlier, downloadable_bytes was the total
// size of the history. From protocol version 26, downloadable_bytes
// represent the non-downloaded bytes on the server. Since the user supplied
// progress handler interprets downloadable_bytes as the total size of
// downloadable bytes, this number must be calculated. We could change the
// meaning of downloadable_bytes for the progress handler, but that would be
// a breaking change. Note that protocol version 25 (and earlier) is no
// longer supported by clients.
captured_transferrable = current_progress.downloaded + current_progress.downloadable;
}
else {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
Expand Down
20 changes: 5 additions & 15 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,27 +1226,17 @@ void SessionWrapper::report_progress()
ClientReplication::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
uploadable_bytes, snapshot_version);

// In protocol versions 25 and earlier, downloadable_bytes was the total
// size of the history. From protocol version 26, downloadable_bytes
// represent the non-downloaded bytes on the server. Since the user supplied
// progress handler interprets downloadable_bytes as the total size of
// downloadable bytes, this number must be calculated. We could change the
// meaning of downloadable_bytes for the progress handler, but that would be
// a breaking change. Note that protocol version 25 (and earlier) is no
// longer supported by clients.
std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;

m_sess->logger.debug("Progress handler called, downloaded = %1, "
"downloadable(total) = %2, uploaded = %3, "
"uploadable = %4, reliable_download_progress = %5, "
"snapshot version = %6",
downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes,
m_reliable_download_progress, snapshot_version);

// FIXME: Why is this boolean status communicated to the application as
// a 64-bit integer? Also, the name `progress_version` is confusing.
std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
// Ignore progress messages from before we first receive a DOWNLOAD message.
// FIXME: This could be a bool instead of std::uint_fast64_t
std::uint_fast64_t download_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, download_version,
snapshot_version);
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Session {
using SyncTransactCallback = void(VersionID old_version, VersionID new_version);
using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version);
std::uint_fast64_t download_version, std::uint_fast64_t snapshot_version);
using WaitOperCompletionHandler = std::function<void(std::error_code)>;
using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data,
size_t pem_size, int preverify_ok, int depth);
Expand Down
50 changes: 22 additions & 28 deletions test/object-store/sync/session/progress_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
const uint64_t original_transferrable = current_transferrable;
uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
Expand All @@ -346,6 +346,7 @@ TEST_CASE("progress notification", "[sync]") {

// Now manually call the notifier handler a few times.
callback_was_called = false;
original_transferrable = current_transferred + current_transferrable;
current_transferred = 66;
current_transferrable = 582;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
Expand Down Expand Up @@ -393,7 +394,7 @@ TEST_CASE("progress notification", "[sync]") {
// Next we get a DOWNLOAD message telling us there's more to download
progress.update(current_transferred, current_transferrable, 0, 0, 1, 1);
REQUIRE(callback_was_called);
REQUIRE(current_transferrable == transferrable);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferred == transferred);

current_transferred = 200;
Expand All @@ -413,7 +414,7 @@ TEST_CASE("progress notification", "[sync]") {
NotifierType::download, false);

REQUIRE(callback_was_called);
REQUIRE(current_transferrable == transferrable);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferred == transferred);
}

Expand Down Expand Up @@ -461,7 +462,7 @@ TEST_CASE("progress notification", "[sync]") {
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_uploadable = current_uploadable;
const uint64_t original_downloadable = current_downloadable;
uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand Down Expand Up @@ -489,6 +490,7 @@ TEST_CASE("progress notification", "[sync]") {
// Now manually call the notifier handler a few times.
callback_was_called = false;
callback_was_called_2 = false;
original_downloadable = current_downloaded + current_downloadable;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
Expand Down Expand Up @@ -522,12 +524,12 @@ TEST_CASE("progress notification", "[sync]") {
current_uploaded = 218;
current_uploadable = 310;
current_downloaded = 182;
current_downloadable = 196;
current_downloadable = 0;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable);
CHECK(downloadable == 0);

// Fourth callback, last one for the download notifier
callback_was_called_2 = false;
Expand All @@ -544,9 +546,9 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
uint64_t current_uploaded = 16;
uint64_t current_uploadable = 201;
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_downloadable = current_downloadable;
uint64_t current_downloaded = 0;
uint64_t current_downloadable = 250;
uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand All @@ -559,11 +561,12 @@ TEST_CASE("progress notification", "[sync]") {
REQUIRE(callback_was_called);

// Now manually call the notifier handler a few times.
original_downloadable = current_downloaded + current_downloadable;
callback_was_called = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
current_downloadable = 185;
current_downloadable = 79;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
Expand All @@ -573,7 +576,7 @@ TEST_CASE("progress notification", "[sync]") {
bool callback_was_called_2 = false;
uint64_t downloaded = 0;
uint64_t downloadable = 0;
const uint64_t original_downloadable_2 = current_downloadable;
uint64_t original_downloadable_2 = current_downloadable;
progress.register_callback(
[&](auto xferred, auto xferable) {
downloaded = xferred;
Expand All @@ -585,41 +588,32 @@ TEST_CASE("progress notification", "[sync]") {
REQUIRE(callback_was_called_2);

// Second callback, last one for first notifier
original_downloadable_2 = current_downloaded + current_downloadable;
callback_was_called = false;
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 182;
current_downloadable = 190;
current_downloaded = 250;
current_downloadable = 50;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
CHECK(transferrable == original_downloadable);
CHECK(transferrable == 250);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable_2);
CHECK(downloadable == 250);

// Third callback, last one for second notifier
callback_was_called = false;
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 189;
current_downloadable = 250;
current_downloaded = 250;
current_downloadable = 0;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable_2);

// Fourth callback
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 201;
current_downloadable = 289;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called_2);
CHECK(downloaded == current_downloaded);
}
}
}
35 changes: 11 additions & 24 deletions test/test_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4209,7 +4209,6 @@ TEST(Sync_UploadDownloadProgress_2)
CHECK_EQUAL(downloadable_bytes_1, 0);

CHECK_NOT_EQUAL(downloaded_bytes_2, 0);
CHECK_NOT_EQUAL(downloadable_bytes_2, 0);

CHECK_NOT_EQUAL(uploaded_bytes_1, 0);
CHECK_NOT_EQUAL(uploadable_bytes_1, 0);
Expand Down Expand Up @@ -4241,10 +4240,8 @@ TEST(Sync_UploadDownloadProgress_2)
session_2.wait_for_download_complete_or_client_stopped();

CHECK_NOT_EQUAL(downloaded_bytes_1, 0);
CHECK_NOT_EQUAL(downloadable_bytes_1, 0);

CHECK_NOT_EQUAL(downloaded_bytes_2, 0);
CHECK_NOT_EQUAL(downloadable_bytes_2, 0);

CHECK_NOT_EQUAL(uploaded_bytes_1, 0);
CHECK_NOT_EQUAL(uploadable_bytes_1, 0);
Expand All @@ -4270,12 +4267,12 @@ TEST(Sync_UploadDownloadProgress_2)
session_1.wait_for_download_complete_or_client_stopped();
session_2.wait_for_download_complete_or_client_stopped();

CHECK_EQUAL(downloaded_bytes_1, downloadable_bytes_1);
CHECK_EQUAL(downloadable_bytes_1, 0);

// uncertainty due to merge
CHECK_NOT_EQUAL(downloaded_bytes_1, 0);

CHECK_EQUAL(downloaded_bytes_2, downloadable_bytes_2);
CHECK_EQUAL(downloadable_bytes_2, 0);
CHECK_NOT_EQUAL(downloaded_bytes_2, 0);

CHECK_NOT_EQUAL(uploaded_bytes_1, 0);
Expand Down Expand Up @@ -4556,7 +4553,7 @@ TEST(Sync_UploadDownloadProgress_4)
else if (entry_2 == 2) {
CHECK_GREATER(progress_version, 0);
CHECK_NOT_EQUAL(downloaded_bytes, 0);
CHECK_NOT_EQUAL(downloadable_bytes, 0);
CHECK_EQUAL(downloadable_bytes, 0);
CHECK_EQUAL(snapshot_version, 4);
}

Expand Down Expand Up @@ -4997,11 +4994,11 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32))
CHECK((cb.size() == binary_sizes[3] && cb[0] == 'd') || (cb.size() == binary_sizes[7] && cb[0] == 'h'));
}

CHECK_EQUAL(downloadable_bytes_1, downloaded_bytes_1);
CHECK_EQUAL(downloadable_bytes_1, 0);
CHECK_EQUAL(uploadable_bytes_1, uploaded_bytes_1);
CHECK_NOT_EQUAL(uploaded_bytes_1, 0);

CHECK_EQUAL(downloadable_bytes_2, downloaded_bytes_2);
CHECK_EQUAL(downloadable_bytes_2, 0);
CHECK_EQUAL(uploadable_bytes_2, uploaded_bytes_2);
CHECK_NOT_EQUAL(uploaded_bytes_2, 0);

Expand Down Expand Up @@ -5151,11 +5148,11 @@ TEST(Sync_MergeLargeBinaryReducedMemory)
CHECK((cb.size() == binary_sizes[3] && cb[0] == 'd') || (cb.size() == binary_sizes[7] && cb[0] == 'h'));
}

CHECK_EQUAL(downloadable_bytes_1, downloaded_bytes_1);
CHECK_EQUAL(downloadable_bytes_1, 0);
CHECK_EQUAL(uploadable_bytes_1, uploaded_bytes_1);
CHECK_NOT_EQUAL(uploaded_bytes_1, 0);

CHECK_EQUAL(downloadable_bytes_2, downloaded_bytes_2);
CHECK_EQUAL(downloadable_bytes_2, 0);
CHECK_EQUAL(uploadable_bytes_2, uploaded_bytes_2);
CHECK_NOT_EQUAL(uploaded_bytes_2, 0);

Expand Down Expand Up @@ -5486,15 +5483,10 @@ TEST(Sync_UploadLogCompactionEnabled)
fixture.bind_session(session_1, "/test");
session_1.wait_for_upload_complete_or_client_stopped();

auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes,
uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes,
uint_fast64_t progress_version, uint_fast64_t snapshot_version) {
CHECK_EQUAL(downloaded_bytes, downloadable_bytes);
auto progress_handler = [&](uint_fast64_t, uint_fast64_t, uint_fast64_t uploaded_bytes,
uint_fast64_t uploadable_bytes, uint_fast64_t, uint_fast64_t) {
CHECK_EQUAL(0, uploaded_bytes);
CHECK_EQUAL(0, uploadable_bytes);
static_cast<void>(snapshot_version);
if (progress_version > 0)
CHECK_NOT_EQUAL(downloadable_bytes, 0);
};

session_2.set_progress_handler(progress_handler);
Expand Down Expand Up @@ -5545,15 +5537,10 @@ TEST(Sync_UploadLogCompactionDisabled)
Session session_1 = fixture.make_bound_session(db_1, "/test");
session_1.wait_for_upload_complete_or_client_stopped();

auto progress_handler = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version) {
CHECK_EQUAL(downloaded_bytes, downloadable_bytes);
auto progress_handler = [&](std::uint_fast64_t, std::uint_fast64_t, std::uint_fast64_t uploaded_bytes,
std::uint_fast64_t uploadable_bytes, std::uint_fast64_t, std::uint_fast64_t) {
CHECK_EQUAL(0, uploaded_bytes);
CHECK_EQUAL(0, uploadable_bytes);
static_cast<void>(snapshot_version);
if (progress_version > 0)
CHECK_NOT_EQUAL(0, downloadable_bytes);
};

Session session_2 = fixture.make_session(db_2);
Expand Down