Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

Implement precise, unbatched notification of sync completion events #1054

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions src/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ struct sync_session_states::Inactive : public SyncSession::State {
}

// Inform any queued-up completion handlers that they were cancelled.
for (auto& callback : download_waits)
for (auto& [id, callback] : download_waits)
callback(make_error_code(util::error::operation_aborted));
for (auto& callback : upload_waits)
for (auto& [id, callback] : upload_waits)
callback(make_error_code(util::error::operation_aborted));
}

Expand Down Expand Up @@ -553,9 +553,9 @@ void SyncSession::cancel_pending_waits(std::unique_lock<std::mutex>& lock, std::
lock.unlock();

// Inform any queued-up completion handlers that they were cancelled.
for (auto& callback : download)
for (auto& [id, callback] : download)
callback(error);
for (auto& callback : upload)
for (auto& [id, callback] : upload)
callback(error);
}

Expand Down Expand Up @@ -722,11 +722,13 @@ void SyncSession::add_completion_callback(_impl::SyncProgressNotifier::NotifierT
{
bool is_download = direction == _impl::SyncProgressNotifier::NotifierType::download;

int resync_counter = m_client_resync_counter;
std::weak_ptr<SyncSession> weak_self = shared_from_this();
auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
: &sync::Session::async_wait_for_upload_completion;
(m_session.get()->*waiter)([resync_counter, weak_self, is_download](std::error_code ec) {
(m_session.get()->*waiter)([is_download,
resync_counter = m_client_resync_counter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the resync_counter logic is now redundant since if there was a resync, it won't find the entry for this id in the map. If you agree, I will remove that logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks like a more general solution to the problem the resync counter was addressing.

weak_self = std::weak_ptr(shared_from_this()),
id = m_completion_request_counter
] (std::error_code ec) {
auto self = weak_self.lock();
if (!self)
return;
Expand All @@ -737,28 +739,33 @@ void SyncSession::add_completion_callback(_impl::SyncProgressNotifier::NotifierT
// notifications from it
return;
}
auto callbacks = std::move(is_download ? self->m_download_completion_callbacks
: self->m_upload_completion_callbacks);
auto& callbacks = is_download ? self->m_download_completion_callbacks
: self->m_upload_completion_callbacks;
auto callback_node = callbacks.extract(id);
lock.unlock();
for (auto& callback : callbacks)
callback(ec);
if (callback_node)
callback_node.mapped()(ec);
});
}

void SyncSession::wait_for_upload_completion(std::function<void(std::error_code)> callback)
{
std::unique_lock<std::mutex> lock(m_state_mutex);
if (m_upload_completion_callbacks.empty())
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::upload);
m_upload_completion_callbacks.push_back(std::move(callback));
m_completion_request_counter++; // read in wait_for_completion()
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::upload);
m_upload_completion_callbacks.emplace_hint(m_upload_completion_callbacks.end(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is worth moving this into add_completion_callback? I can't really see any reason for the current distribution of work between the functions, but I didn't want to make too much unnecessary change in a bugfix commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need separate upload/download completion callback maps with this change? If not then merging them and pushing this into add_completion_callback() makes sense, but without merging them it'd be another awkward conditional there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think so. Merging them cleans up the code nicely.

m_completion_request_counter,
std::move(callback));
}

void SyncSession::wait_for_download_completion(std::function<void(std::error_code)> callback)
{
std::unique_lock<std::mutex> lock(m_state_mutex);
if (m_download_completion_callbacks.empty())
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::download);
m_download_completion_callbacks.push_back(std::move(callback));
m_completion_request_counter++; // read in wait_for_completion()
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::download);
m_download_completion_callbacks.emplace_hint(m_download_completion_callbacks.end(),
m_completion_request_counter,
std::move(callback));
}

uint64_t SyncSession::register_progress_notifier(std::function<SyncProgressNotifierCallback> notifier,
Expand Down
6 changes: 4 additions & 2 deletions src/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <mutex>
#include <unordered_map>
#include <map>

namespace realm {

Expand Down Expand Up @@ -352,8 +353,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
std::string m_realm_path;
_impl::SyncClient& m_client;

std::vector<std::function<void(std::error_code)>> m_download_completion_callbacks;
std::vector<std::function<void(std::error_code)>> m_upload_completion_callbacks;
int64_t m_completion_request_counter = 0;
std::map<int64_t, std::function<void(std::error_code)>> m_download_completion_callbacks;
std::map<int64_t, std::function<void(std::error_code)>> m_upload_completion_callbacks;
// How many times a client resync has occurred. Used to discard session
// completion notifications from before the most recent client resync.
int m_client_resync_counter = 0;
Expand Down