Skip to content

Commit

Permalink
Update captured_transferrable when downloadable bytes value changes. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
leemaguire authored Oct 27, 2021
1 parent 0a0d02a commit e5a115f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* Calling `size()` on a Results newly constructed via `.as_results().distinct()` on a Collection would give the size of the Collection rather than the distinct count. ([Cocoa #7481](https://github.com/realm/realm-cocoa/issues/7481), since v11.0.0).
* Calling `clear()` on a Results newly constructed via `.as_results().distinct()` on a Collection would delete all objects in the Collection rather than just the distinct objects in the Results (since v11.0.0).
* Calling `clear()` on a Results constructed via `.as_results().distinct()` on a Collection after calling `get()` or `size()` would not re-evaluate the distinct until after the next mutation to the table occurred.
* Sync progress notifiers would not trigger when the downloadable bytes size would equal 0.

### Breaking changes
* `App::Config::transport_factory` was replaced with `App::Config::transport`. It should now be an instance of `GenericNetworkTransport` rather than a factory for making instances. This allows the SDK to control which thread constructs the transport layer. ([#4903](https://github.com/realm/realm-core/pull/4903))
Expand Down
16 changes: 14 additions & 2 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -982,11 +982,23 @@ std::function<void()> SyncProgressNotifier::NotifierPackage::create_invocation(P
transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
}
else if (captured_transferrable) {
if (is_download && (current_progress.downloadable == 0)) {
captured_transferrable = 0;
}
transferrable = *captured_transferrable;
}
else {
if (is_download)
captured_transferrable = current_progress.downloadable;
if (is_download) {
// In protocol versions 25 and earlier, downloadable_bytes was the total
// size of the history. From protocol version 26, downloadable_bytes
// represent the non-downloaded bytes on the server. Since the user supplied
// progress handler interprets downloadable_bytes as the total size of
// downloadable bytes, this number must be calculated. We could change the
// meaning of downloadable_bytes for the progress handler, but that would be
// a breaking change. Note that protocol version 25 (and earlier) is no
// longer supported by clients.
captured_transferrable = current_progress.downloaded + current_progress.downloadable;
}
else {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
Expand Down
20 changes: 5 additions & 15 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,27 +1226,17 @@ void SessionWrapper::report_progress()
ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
uploadable_bytes, snapshot_version);

// In protocol versions 25 and earlier, downloadable_bytes was the total
// size of the history. From protocol version 26, downloadable_bytes
// represent the non-downloaded bytes on the server. Since the user supplied
// progress handler interprets downloadable_bytes as the total size of
// downloadable bytes, this number must be calculated. We could change the
// meaning of downloadable_bytes for the progress handler, but that would be
// a breaking change. Note that protocol version 25 (and earlier) is no
// longer supported by clients.
std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;

m_sess->logger.debug("Progress handler called, downloaded = %1, "
"downloadable(total) = %2, uploaded = %3, "
"uploadable = %4, reliable_download_progress = %5, "
"snapshot version = %6",
downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes,
m_reliable_download_progress, snapshot_version);

// FIXME: Why is this boolean status communicated to the application as
// a 64-bit integer? Also, the name `progress_version` is confusing.
std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
// Ignore progress messages from before we first receive a DOWNLOAD message.
// FIXME: This could be a bool instead of std::uint_fast64_t
std::uint_fast64_t download_version = (m_reliable_download_progress ? 1 : 0);
m_progress_handler(downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, download_version,
snapshot_version);
}

Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Session {
using SyncTransactCallback = void(VersionID old_version, VersionID new_version);
using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version);
std::uint_fast64_t download_version, std::uint_fast64_t snapshot_version);
using WaitOperCompletionHandler = std::function<void(std::error_code)>;
using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data,
size_t pem_size, int preverify_ok, int depth);
Expand Down
50 changes: 22 additions & 28 deletions test/object-store/sync/session/progress_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
current_transferred = 60;
current_transferrable = 501;
const uint64_t original_transferrable = current_transferrable;
uint64_t original_transferrable = current_transferrable;
progress.update(current_transferred, current_transferrable, 21, 26, 1, 1);

progress.register_callback(
Expand All @@ -346,6 +346,7 @@ TEST_CASE("progress notification", "[sync]") {

// Now manually call the notifier handler a few times.
callback_was_called = false;
original_transferrable = current_transferred + current_transferrable;
current_transferred = 66;
current_transferrable = 582;
progress.update(current_transferred, current_transferrable, 25, 26, 1, 1);
Expand Down Expand Up @@ -393,7 +394,7 @@ TEST_CASE("progress notification", "[sync]") {
// Next we get a DOWNLOAD message telling us there's more to download
progress.update(current_transferred, current_transferrable, 0, 0, 1, 1);
REQUIRE(callback_was_called);
REQUIRE(current_transferrable == transferrable);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferred == transferred);

current_transferred = 200;
Expand All @@ -413,7 +414,7 @@ TEST_CASE("progress notification", "[sync]") {
NotifierType::download, false);

REQUIRE(callback_was_called);
REQUIRE(current_transferrable == transferrable);
REQUIRE(transferrable == (current_transferred + current_transferrable));
REQUIRE(current_transferred == transferred);
}

Expand Down Expand Up @@ -461,7 +462,7 @@ TEST_CASE("progress notification", "[sync]") {
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_uploadable = current_uploadable;
const uint64_t original_downloadable = current_downloadable;
uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand Down Expand Up @@ -489,6 +490,7 @@ TEST_CASE("progress notification", "[sync]") {
// Now manually call the notifier handler a few times.
callback_was_called = false;
callback_was_called_2 = false;
original_downloadable = current_downloaded + current_downloadable;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
Expand Down Expand Up @@ -522,12 +524,12 @@ TEST_CASE("progress notification", "[sync]") {
current_uploaded = 218;
current_uploadable = 310;
current_downloaded = 182;
current_downloadable = 196;
current_downloadable = 0;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable);
CHECK(downloadable == 0);

// Fourth callback, last one for the download notifier
callback_was_called_2 = false;
Expand All @@ -544,9 +546,9 @@ TEST_CASE("progress notification", "[sync]") {
// Prime the progress updater
uint64_t current_uploaded = 16;
uint64_t current_uploadable = 201;
uint64_t current_downloaded = 68;
uint64_t current_downloadable = 182;
const uint64_t original_downloadable = current_downloadable;
uint64_t current_downloaded = 0;
uint64_t current_downloadable = 250;
uint64_t original_downloadable = current_downloadable;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);

progress.register_callback(
Expand All @@ -559,11 +561,12 @@ TEST_CASE("progress notification", "[sync]") {
REQUIRE(callback_was_called);

// Now manually call the notifier handler a few times.
original_downloadable = current_downloaded + current_downloadable;
callback_was_called = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 171;
current_downloadable = 185;
current_downloadable = 79;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
Expand All @@ -573,7 +576,7 @@ TEST_CASE("progress notification", "[sync]") {
bool callback_was_called_2 = false;
uint64_t downloaded = 0;
uint64_t downloadable = 0;
const uint64_t original_downloadable_2 = current_downloadable;
uint64_t original_downloadable_2 = current_downloadable;
progress.register_callback(
[&](auto xferred, auto xferable) {
downloaded = xferred;
Expand All @@ -585,41 +588,32 @@ TEST_CASE("progress notification", "[sync]") {
REQUIRE(callback_was_called_2);

// Second callback, last one for first notifier
original_downloadable_2 = current_downloaded + current_downloadable;
callback_was_called = false;
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 182;
current_downloadable = 190;
current_downloaded = 250;
current_downloadable = 50;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(callback_was_called);
CHECK(transferred == current_downloaded);
CHECK(transferrable == original_downloadable);
CHECK(transferrable == 250);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable_2);
CHECK(downloadable == 250);

// Third callback, last one for second notifier
callback_was_called = false;
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 189;
current_downloadable = 250;
current_downloaded = 250;
current_downloadable = 0;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called);
CHECK(callback_was_called_2);
CHECK(downloaded == current_downloaded);
CHECK(downloadable == original_downloadable_2);

// Fourth callback
callback_was_called_2 = false;
current_uploaded = 36;
current_uploadable = 310;
current_downloaded = 201;
current_downloadable = 289;
progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1);
CHECK(!callback_was_called_2);
CHECK(downloaded == current_downloaded);
}
}
}
35 changes: 11 additions & 24 deletions test/test_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4214,7 +4214,6 @@ TEST(Sync_UploadDownloadProgress_2)
CHECK_EQUAL(downloadable_bytes_1, 0);

CHECK_NOT_EQUAL(downloaded_bytes_2, 0);
CHECK_NOT_EQUAL(downloadable_bytes_2, 0);

CHECK_NOT_EQUAL(uploaded_bytes_1, 0);
CHECK_NOT_EQUAL(uploadable_bytes_1, 0);
Expand Down Expand Up @@ -4246,10 +4245,8 @@ TEST(Sync_UploadDownloadProgress_2)
session_2.wait_for_download_complete_or_client_stopped();

CHECK_NOT_EQUAL(downloaded_bytes_1, 0);
CHECK_NOT_EQUAL(downloadable_bytes_1, 0);

CHECK_NOT_EQUAL(downloaded_bytes_2, 0);
CHECK_NOT_EQUAL(downloadable_bytes_2, 0);

CHECK_NOT_EQUAL(uploaded_bytes_1, 0);
CHECK_NOT_EQUAL(uploadable_bytes_1, 0);
Expand All @@ -4275,12 +4272,12 @@ TEST(Sync_UploadDownloadProgress_2)
session_1.wait_for_download_complete_or_client_stopped();
session_2.wait_for_download_complete_or_client_stopped();

CHECK_EQUAL(downloaded_bytes_1, downloadable_bytes_1);
CHECK_EQUAL(downloadable_bytes_1, 0);

// uncertainty due to merge
CHECK_NOT_EQUAL(downloaded_bytes_1, 0);

CHECK_EQUAL(downloaded_bytes_2, downloadable_bytes_2);
CHECK_EQUAL(downloadable_bytes_2, 0);
CHECK_NOT_EQUAL(downloaded_bytes_2, 0);

CHECK_NOT_EQUAL(uploaded_bytes_1, 0);
Expand Down Expand Up @@ -4561,7 +4558,7 @@ TEST(Sync_UploadDownloadProgress_4)
else if (entry_2 == 2) {
CHECK_GREATER(progress_version, 0);
CHECK_NOT_EQUAL(downloaded_bytes, 0);
CHECK_NOT_EQUAL(downloadable_bytes, 0);
CHECK_EQUAL(downloadable_bytes, 0);
CHECK_EQUAL(snapshot_version, 4);
}

Expand Down Expand Up @@ -5002,11 +4999,11 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32))
CHECK((cb.size() == binary_sizes[3] && cb[0] == 'd') || (cb.size() == binary_sizes[7] && cb[0] == 'h'));
}

CHECK_EQUAL(downloadable_bytes_1, downloaded_bytes_1);
CHECK_EQUAL(downloadable_bytes_1, 0);
CHECK_EQUAL(uploadable_bytes_1, uploaded_bytes_1);
CHECK_NOT_EQUAL(uploaded_bytes_1, 0);

CHECK_EQUAL(downloadable_bytes_2, downloaded_bytes_2);
CHECK_EQUAL(downloadable_bytes_2, 0);
CHECK_EQUAL(uploadable_bytes_2, uploaded_bytes_2);
CHECK_NOT_EQUAL(uploaded_bytes_2, 0);

Expand Down Expand Up @@ -5156,11 +5153,11 @@ TEST(Sync_MergeLargeBinaryReducedMemory)
CHECK((cb.size() == binary_sizes[3] && cb[0] == 'd') || (cb.size() == binary_sizes[7] && cb[0] == 'h'));
}

CHECK_EQUAL(downloadable_bytes_1, downloaded_bytes_1);
CHECK_EQUAL(downloadable_bytes_1, 0);
CHECK_EQUAL(uploadable_bytes_1, uploaded_bytes_1);
CHECK_NOT_EQUAL(uploaded_bytes_1, 0);

CHECK_EQUAL(downloadable_bytes_2, downloaded_bytes_2);
CHECK_EQUAL(downloadable_bytes_2, 0);
CHECK_EQUAL(uploadable_bytes_2, uploaded_bytes_2);
CHECK_NOT_EQUAL(uploaded_bytes_2, 0);

Expand Down Expand Up @@ -5491,15 +5488,10 @@ TEST(Sync_UploadLogCompactionEnabled)
fixture.bind_session(session_1, "/test");
session_1.wait_for_upload_complete_or_client_stopped();

auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes,
uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes,
uint_fast64_t progress_version, uint_fast64_t snapshot_version) {
CHECK_EQUAL(downloaded_bytes, downloadable_bytes);
auto progress_handler = [&](uint_fast64_t, uint_fast64_t, uint_fast64_t uploaded_bytes,
uint_fast64_t uploadable_bytes, uint_fast64_t, uint_fast64_t) {
CHECK_EQUAL(0, uploaded_bytes);
CHECK_EQUAL(0, uploadable_bytes);
static_cast<void>(snapshot_version);
if (progress_version > 0)
CHECK_NOT_EQUAL(downloadable_bytes, 0);
};

session_2.set_progress_handler(progress_handler);
Expand Down Expand Up @@ -5550,15 +5542,10 @@ TEST(Sync_UploadLogCompactionDisabled)
Session session_1 = fixture.make_bound_session(db_1, "/test");
session_1.wait_for_upload_complete_or_client_stopped();

auto progress_handler = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version) {
CHECK_EQUAL(downloaded_bytes, downloadable_bytes);
auto progress_handler = [&](std::uint_fast64_t, std::uint_fast64_t, std::uint_fast64_t uploaded_bytes,
std::uint_fast64_t uploadable_bytes, std::uint_fast64_t, std::uint_fast64_t) {
CHECK_EQUAL(0, uploaded_bytes);
CHECK_EQUAL(0, uploadable_bytes);
static_cast<void>(snapshot_version);
if (progress_version > 0)
CHECK_NOT_EQUAL(0, downloadable_bytes);
};

Session session_2 = fixture.make_session(db_2);
Expand Down

0 comments on commit e5a115f

Please sign in to comment.