Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

ROBJSTORE-98 Implement precise and unbatched notification of sync completion events #1118

Merged
merged 2 commits into from
Oct 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 50 additions & 69 deletions src/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ struct SyncSession::State {
// `inactive` state after the invocation of shutdown_and_wait().
virtual void shutdown_and_wait(std::unique_lock<std::mutex>&, SyncSession&) const { }

// Register a handler to wait for sync session uploads, downloads, or synchronization.
// PRECONDITION: the session state lock must be held at the time this method is called, until after it returns.
virtual void wait_for_completion(SyncSession&, _impl::SyncProgressNotifier::NotifierType) const { }

static const State& active;
static const State& dying;
static const State& inactive;
Expand All @@ -114,7 +110,7 @@ struct sync_session_states::Active : public SyncSession::State {
//msvc needs this to initialize the class correctly
constexpr Active() {};

void enter_state(std::unique_lock<std::mutex>&, SyncSession& session) const override
void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
{
// when entering from the Dying state the session will still be bound
if (!session.m_session) {
Expand All @@ -125,10 +121,12 @@ struct sync_session_states::Active : public SyncSession::State {
// Register all the pending wait-for-completion blocks. This can
// potentially add a redundant callback if we're coming from the Dying
// state, but that's okay (we won't call the user callbacks twice).
if (!session.m_upload_completion_callbacks.empty())
session.add_completion_callback(_impl::SyncProgressNotifier::NotifierType::upload);
if (!session.m_download_completion_callbacks.empty())
session.add_completion_callback(_impl::SyncProgressNotifier::NotifierType::download);
SyncSession::CompletionCallbacks callbacks_to_register;
std::swap(session.m_completion_callbacks, callbacks_to_register);

for (auto& [id, callback_tuple] : callbacks_to_register) {
session.add_completion_callback(lock, std::move(callback_tuple.second), callback_tuple.first);
}
}

void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
Expand Down Expand Up @@ -164,12 +162,6 @@ struct sync_session_states::Active : public SyncSession::State {
session.advance_state(lock, inactive);
}

void wait_for_completion(SyncSession& session, _impl::SyncProgressNotifier::NotifierType direction) const override
{
REALM_ASSERT(session.m_session);
session.add_completion_callback(direction);
}

void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession& session) const override
{
session.m_session->cancel_reconnect_delay();
Expand All @@ -189,7 +181,7 @@ struct sync_session_states::Dying : public SyncSession::State {
}

size_t current_death_count = ++session.m_death_count;
std::weak_ptr<SyncSession> weak_session = session.shared_from_this();
auto weak_session = session.weak_from_this();
session.m_session->async_wait_for_upload_completion([weak_session, current_death_count](std::error_code) {
if (auto session = weak_session.lock()) {
std::unique_lock<std::mutex> lock(session->m_state_mutex);
Expand Down Expand Up @@ -223,12 +215,6 @@ struct sync_session_states::Dying : public SyncSession::State {
{
session.advance_state(lock, inactive);
}

void wait_for_completion(SyncSession& session, _impl::SyncProgressNotifier::NotifierType direction) const override
{
REALM_ASSERT(session.m_session);
session.add_completion_callback(direction);
}
};

struct sync_session_states::Inactive : public SyncSession::State {
Expand All @@ -243,10 +229,8 @@ struct sync_session_states::Inactive : public SyncSession::State {
auto old_state = session.m_connection_state;
auto new_state = session.m_connection_state = SyncSession::ConnectionState::Disconnected;

auto download_waits = std::move(session.m_download_completion_callbacks);
auto upload_waits = std::move(session.m_upload_completion_callbacks);
session.m_download_completion_callbacks.clear();
session.m_upload_completion_callbacks.clear();
SyncSession::CompletionCallbacks waits;
std::swap(waits, session.m_completion_callbacks);

session.m_session = nullptr;
session.unregister(lock); // releases lock
Expand All @@ -257,10 +241,8 @@ struct sync_session_states::Inactive : public SyncSession::State {
}

// Inform any queued-up completion handlers that they were cancelled.
for (auto& callback : download_waits)
callback(make_error_code(util::error::operation_aborted));
for (auto& callback : upload_waits)
callback(make_error_code(util::error::operation_aborted));
for (auto& [id, callback] : waits)
callback.second(make_error_code(util::error::operation_aborted));
}

bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
Expand All @@ -275,7 +257,6 @@ struct sync_session_states::Inactive : public SyncSession::State {
}
};


const SyncSession::State& SyncSession::State::active = Active();
const SyncSession::State& SyncSession::State::dying = Dying();
const SyncSession::State& SyncSession::State::inactive = Inactive();
Expand Down Expand Up @@ -379,23 +360,20 @@ void SyncSession::handle_error(SyncError error)
// when the old session is torn down, which we don't want as this
// is supposed to be transparent to the user.
//
// To avoid this, we need to do two things: move the completion
// handlers aside temporarily so that moving to the inactive
// state doesn't clear them, and track which sync::Session each
// completion notification came from so that we can ignore
// notifications from the old session.
// To avoid this, we need to move the completion handlers aside temporarily so
// that moving to the inactive state doesn't clear them - they will be re-registered
// when the session becomes active again.
{
std::unique_lock<std::mutex> lock(m_state_mutex);
m_force_client_resync = true;

++m_client_resync_counter;
auto download_handlers = std::move(m_download_completion_callbacks);
auto upload_handlers = std::move(m_upload_completion_callbacks);

CompletionCallbacks callbacks;
std::swap(m_completion_callbacks, callbacks);
advance_state(lock, State::inactive);

m_download_completion_callbacks = std::move(download_handlers);
m_upload_completion_callbacks = std::move(upload_handlers);
// FIXME This should be done in a scope guard so that we always do this, even if advance_state
// throws.
std::swap(callbacks, m_completion_callbacks);
}
revive_if_needed();
return;
Expand Down Expand Up @@ -566,15 +544,14 @@ void SyncSession::handle_error(SyncError error)

void SyncSession::cancel_pending_waits(std::unique_lock<std::mutex>& lock, std::error_code error)
{
auto download = std::move(m_download_completion_callbacks);
auto upload = std::move(m_upload_completion_callbacks);

CompletionCallbacks callbacks;
std::swap(callbacks, m_completion_callbacks);
lock.unlock();

// Inform any queued-up completion handlers that they were cancelled.
for (auto& callback : download)
callback(error);
for (auto& callback : upload)
callback(error);
for (auto& [id, callback] : callbacks)
callback.second(error);
}

void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable,
Expand Down Expand Up @@ -743,47 +720,51 @@ void SyncSession::unregister(std::unique_lock<std::mutex>& lock)
m_config.user->sync_manager()->unregister_session(m_realm_path);
}

void SyncSession::add_completion_callback(_impl::SyncProgressNotifier::NotifierType direction)
void SyncSession::add_completion_callback(const std::unique_lock<std::mutex>&,
std::function<void(std::error_code)> callback,
_impl::SyncProgressNotifier::NotifierType direction)
{
bool is_download = direction == _impl::SyncProgressNotifier::NotifierType::download;
bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download);

m_completion_request_counter++;
m_completion_callbacks.emplace_hint(m_completion_callbacks.end(),
m_completion_request_counter,
std::make_pair(
direction,
std::move(callback)));
// If the state is inactive then just store the callback and return. The callback will get
// re-registered with the underlying session if/when the session ever becomes active again.
if (get_public_state() == PublicState::Inactive) {
return;
}

int resync_counter = m_client_resync_counter;
std::weak_ptr<SyncSession> weak_self = shared_from_this();
auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
: &sync::Session::async_wait_for_upload_completion;
(m_session.get()->*waiter)([resync_counter, weak_self, is_download](std::error_code ec) {

(m_session.get()->*waiter)([weak_self = weak_from_this(),
id = m_completion_request_counter
] (std::error_code ec) {
auto self = weak_self.lock();
if (!self)
return;
std::unique_lock<std::mutex> lock(self->m_state_mutex);
if (resync_counter != self->m_client_resync_counter) {
// This callback was registered on a previous sync session and not
// the current one, so we want to simply discard completion
// notifications from it
return;
}
auto callbacks = std::move(is_download ? self->m_download_completion_callbacks
: self->m_upload_completion_callbacks);
auto callback_node = self->m_completion_callbacks.extract(id);
lock.unlock();
for (auto& callback : callbacks)
callback(ec);
if (callback_node)
callback_node.mapped().second(ec);
});
}

void SyncSession::wait_for_upload_completion(std::function<void(std::error_code)> callback)
{
std::unique_lock<std::mutex> lock(m_state_mutex);
if (m_upload_completion_callbacks.empty())
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::upload);
m_upload_completion_callbacks.push_back(std::move(callback));
add_completion_callback(lock, std::move(callback), _impl::SyncProgressNotifier::NotifierType::upload);
}

void SyncSession::wait_for_download_completion(std::function<void(std::error_code)> callback)
{
std::unique_lock<std::mutex> lock(m_state_mutex);
if (m_download_completion_callbacks.empty())
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::download);
m_download_completion_callbacks.push_back(std::move(callback));
add_completion_callback(lock, std::move(callback), _impl::SyncProgressNotifier::NotifierType::download);
}

uint64_t SyncSession::register_progress_notifier(std::function<SyncProgressNotifierCallback> notifier,
Expand Down
16 changes: 9 additions & 7 deletions src/sync/sync_session.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
Expand Down Expand Up @@ -27,6 +27,7 @@

#include <mutex>
#include <unordered_map>
#include <map>

namespace realm {

Expand Down Expand Up @@ -274,6 +275,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

private:
using std::enable_shared_from_this<SyncSession>::shared_from_this;
using CompletionCallbacks = std::map<int64_t,
std::pair<_impl::SyncProgressNotifier::NotifierType, std::function<void(std::error_code)>>>;

struct State;
friend struct _impl::sync_session_states::Active;
Expand Down Expand Up @@ -336,7 +339,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void unregister(std::unique_lock<std::mutex>& lock);
void did_drop_external_reference();

void add_completion_callback(_impl::SyncProgressNotifier::NotifierType direction);
void add_completion_callback(const std::unique_lock<std::mutex>&,
std::function<void(std::error_code)> callback,
_impl::SyncProgressNotifier::NotifierType direction);

std::function<SyncSessionTransactCallback> m_sync_transact_callback;

Expand All @@ -356,11 +361,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
std::string m_realm_path;
_impl::SyncClient& m_client;

std::vector<std::function<void(std::error_code)>> m_download_completion_callbacks;
std::vector<std::function<void(std::error_code)>> m_upload_completion_callbacks;
// How many times a client resync has occurred. Used to discard session
// completion notifications from before the most recent client resync.
int m_client_resync_counter = 0;
int64_t m_completion_request_counter = 0;
CompletionCallbacks m_completion_callbacks;

// The underlying `Session` object that is owned and managed by this `SyncSession`.
// The session is first created when the `SyncSession` is moved out of its initial `inactive` state.
Expand Down