Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2099 Restore progress notifier behavior when sync session is already caught up #7681

Merged
merged 14 commits into from
May 20, 2024
94 changes: 51 additions & 43 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,10 @@ void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status erro

void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
uint64_t uploadable, uint64_t snapshot_version, double download_estimate,
double upload_estimate)
double upload_estimate, int64_t query_version)
{
m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate,
upload_estimate);
upload_estimate, query_version);
}

static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
Expand Down Expand Up @@ -962,10 +962,10 @@ void SyncSession::create_sync_session()
m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
uint_fast64_t uploaded, uint_fast64_t uploadable,
uint_fast64_t snapshot_version, double download_estimate,
double upload_estimate) {
double upload_estimate, int64_t query_version) {
if (auto self = weak_self.lock()) {
self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version,
download_estimate, upload_estimate);
download_estimate, upload_estimate, query_version);
}
});

Expand Down Expand Up @@ -1267,7 +1267,12 @@ void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
ProgressDirection direction, bool is_streaming)
{
return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming);
int64_t pending_query_version = 0;
assert_mutex_unlocked();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register_progress_notifier() needs to be annotated as REQUIRES(!m_state_mutex) rather than this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (auto sub_store = get_flx_subscription_store()) {
pending_query_version = sub_store->get_version_info().latest;
}
return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming, pending_query_version);
}

void SyncSession::unregister_progress_notifier(uint64_t token)
Expand Down Expand Up @@ -1519,22 +1524,23 @@ void SyncSession::did_drop_external_reference()
}

uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
NotifierType direction, bool is_streaming)
NotifierType direction, bool is_streaming,
int64_t pending_query_version)
{
util::UniqueFunction<void()> invocation;
uint64_t token_value = 0;
{
std::lock_guard<std::mutex> lock(m_mutex);
token_value = m_progress_notifier_token++;
NotifierPackage package{std::move(notifier), m_local_transaction_version, is_streaming,
direction == NotifierType::download};
direction == NotifierType::download, pending_query_version};
if (!m_current_progress) {
// Simply register the package, since we have no data yet.
m_packages.emplace(token_value, std::move(package));
return token_value;
}
bool skip_registration = false;
invocation = package.create_invocation(*m_current_progress, skip_registration, true);
invocation = package.create_invocation(*m_current_progress, skip_registration);
if (skip_registration) {
token_value = 0;
}
Expand All @@ -1553,13 +1559,14 @@ void SyncProgressNotifier::unregister_callback(uint64_t token)
}

void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
uint64_t snapshot_version, double download_estimate, double upload_estimate)
uint64_t snapshot_version, double download_estimate, double upload_estimate,
int64_t query_version)
{
std::vector<util::UniqueFunction<void()>> invocations;
{
std::lock_guard<std::mutex> lock(m_mutex);
m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded,
upload_estimate, download_estimate, snapshot_version};
m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded,
upload_estimate, download_estimate, snapshot_version, query_version};

for (auto it = m_packages.begin(); it != m_packages.end();) {
bool should_delete = false;
Expand All @@ -1579,49 +1586,50 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
}

util::UniqueFunction<void()>
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired,
bool initial_registration)
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
{
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
uint64_t transfered = is_download ? current_progress.downloaded : current_progress.uploaded;
uint64_t transferable = is_download ? current_progress.downloadable : current_progress.uploadable;
double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;

// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};

// for download only invoke the callback on registration if is in active data transfer,
// otherwise delay notifying until an update with the new transfer signaled
if (is_download && !started_notifying && progress_estimate >= 1) {
if (initial_registration) {
initial_transferred = transferred;
return [] {};
}
else if (initial_transferred == transferred)
double estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;

if (!is_streaming) {
// If the sync client has not yet processed all of the local
// transactions then the uploadable data is incorrect and we should
// not invoke the callback
if (!is_download && snapshot_version > current_progress.snapshot_version)
return [] {};
}

started_notifying = true;
// If this is a non-streaming download progress update and this notifier was
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this check prevents reporting any progress until the latest query at the time the user registered the notifier starts bootstrapping right? I guess this is nice because there is no 0->1->0->....->1 anymore, but it may take a long time until the user gets any progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for the most common case this means we skip query version zero and you get progress notifications for version 1. But yeah, in the worst case if you had a whole bunch of outstanding subscription sets you could end up having to wait for them all to sync.

// created for a later query version (i.e. we're currently downloading
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., not i.e.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// subscription set version zero, but subscription set version 1 existed
// when the notifier was registered), then we want to skip this callback.
if (is_download && current_progress.query_version < pending_query_version) {
danieltabacaru marked this conversation as resolved.
Show resolved Hide resolved
return [] {};
}

// only capture and adjust transferable bytes for upload non-streaming to provide
// the progress of upload for the callback registered right after the commit
if (!is_streaming && !is_download) {
// The initial download size we get from the server is the uncompacted
// size, and so the download may complete before we actually receive
// that much data. When that happens, transferrable will drop and we
Comment on lines +1609 to +1611
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mention of compaction here is out of date. transferable can still sometimes drop with BaaS, but it no longer due to compaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i dunno. it can also drop due to compaction with pbs. tbh, i don't know the full set of circumstances when this can happen, but i wanted to preserve the behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this actually also used for upload as a way to save the uploadable bytes at the time of registration (or when update is called first)? The change below also suggests this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, yeah, but why we do that exactly is not clear anymore.

// need to use the new value instead of the captured one.
if (!captured_transferable || *captured_transferable > transferable)
captured_transferable = transferable;
transferable = *captured_transferable;
}

// A notifier is expired for upload if at least as many bytes have been transferred
// as were originally considered transferable based on local committed version
// on callback registration, or when simply 1.0 progress is reached for download
// since the amount of bytes is not precisely known until the end
if (!is_streaming)
is_expired = is_download ? progress_estimate >= 1 : transferred >= transferable;
// Since we can adjust the transferrable downwards the estimate for uploads
// won't be correct since the sync client's view of the estimate is based on
// the total number of uploadable bytes available rather than the number of
// bytes this NotifierPackage was waiting to upload.
if (!is_download) {
estimate = std::min(transfered / double(transferable), 1.0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch! but you need to check transferable is greater than 0. Running for upload notifications, with no data transfer ongoing test and checking the estimate reveals it as NaN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. okay.

}
}

// A notifier is expired if at least as many bytes have been transferred
// as were originally considered transferrable.
is_expired =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, this also fires when there is a fresh realm with no subscriptions and the schema is downloaded (as opposed to previous implementation) right? If that's the case it could be worth mentioning it in the description. Edit: the check at line 1605 actually prevents this.

!is_streaming && (transfered >= transferable && (!is_download || !pending_query_version || estimate >= 1.0));
return [=, notifier = notifier] {
notifier(transferred, transferable, progress_estimate);
notifier(transfered, transferable, estimate);
};
}

Expand Down
14 changes: 7 additions & 7 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ class SyncProgressNotifier {
using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes,
double progress_estimate);

uint64_t register_callback(std::function<ProgressNotifierCallback>, NotifierType direction, bool is_streaming);
uint64_t register_callback(std::function<ProgressNotifierCallback>, NotifierType direction, bool is_streaming,
int64_t pending_query_version);
void unregister_callback(uint64_t);

void set_local_version(uint64_t);
void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
uint64_t snapshot_version, double download_estimate = 1.0, double upload_estimate = 1.0);
uint64_t snapshot_version, double download_estimate, double upload_estimate, int64_t query_version);

private:
mutable std::mutex m_mutex;
Expand All @@ -73,6 +74,7 @@ class SyncProgressNotifier {
double upload_estimate;
double download_estimate;
uint64_t snapshot_version;
int64_t query_version;
};

// A PODS encapsulating some information for progress notifier callbacks a binding
Expand All @@ -82,11 +84,9 @@ class SyncProgressNotifier {
uint64_t snapshot_version;
bool is_streaming;
bool is_download;
bool started_notifying = false;
uint64_t initial_transferred = 0;
int64_t pending_query_version = 0;
std::optional<uint64_t> captured_transferable;
util::UniqueFunction<void()> create_invocation(const Progress&, bool& is_expired,
bool initial_registration = false);
util::UniqueFunction<void()> create_invocation(const Progress&, bool& is_expired);
};

// A counter used as a token to identify progress notifier callbacks registered on this session.
Expand Down Expand Up @@ -422,7 +422,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex);
enum class ShouldBackup { yes, no };
void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double, int64_t);
void handle_new_flx_sync_query(int64_t version);

void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex);
Expand Down
13 changes: 7 additions & 6 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,11 +980,11 @@ void SessionImpl::process_pending_flx_bootstrap()
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
pending_batch.remaining_changesets);
}
on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);

REALM_ASSERT_3(query_version, !=, -1);
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);

on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
DownloadBatchState::LastInBatch, changesets_processed);
// NoAction/EarlyReturn are both valid no-op actions to take here.
Expand Down Expand Up @@ -1958,14 +1958,15 @@ void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadab
ss << std::fixed << std::setprecision(4) << d;
return ss.str();
};
m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7",
p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
to_str(upload_estimate), p.snapshot);
m_sess->logger.debug(
"Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
to_str(upload_estimate), p.snapshot, m_flx_active_version);
}

m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
upload_estimate);
upload_estimate, m_flx_last_seen_version);
}

util::Future<std::string> SessionWrapper::send_test_command(std::string body)
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class Session {
using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
std::uint_fast64_t snapshot_version, double download_estimate,
double upload_estimate);
double upload_estimate, int64_t query_version);
using WaitOperCompletionHandler = util::UniqueFunction<void(Status)>;
using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data,
size_t pem_size, int preverify_ok, int depth);
Expand Down
Loading
Loading