Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

Commit

Permalink
ROBJSTORE-98 Implement precise and unbatched notification of sync com…
Browse files Browse the repository at this point in the history
…pletion events (#1118)
  • Loading branch information
jbreams authored Oct 17, 2020
1 parent d0ac41b commit 3377523
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 76 deletions.
119 changes: 50 additions & 69 deletions src/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ struct SyncSession::State {
// `inactive` state after the invocation of shutdown_and_wait().
virtual void shutdown_and_wait(std::unique_lock<std::mutex>&, SyncSession&) const { }

// Register a handler to wait for sync session uploads, downloads, or synchronization.
// PRECONDITION: the session state lock must be held at the time this method is called, until after it returns.
virtual void wait_for_completion(SyncSession&, _impl::SyncProgressNotifier::NotifierType) const { }

static const State& active;
static const State& dying;
static const State& inactive;
Expand All @@ -114,7 +110,7 @@ struct sync_session_states::Active : public SyncSession::State {
//msvc needs this to initialize the class correctly
constexpr Active() {};

void enter_state(std::unique_lock<std::mutex>&, SyncSession& session) const override
void enter_state(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
{
// when entering from the Dying state the session will still be bound
if (!session.m_session) {
Expand All @@ -125,10 +121,12 @@ struct sync_session_states::Active : public SyncSession::State {
// Register all the pending wait-for-completion blocks. This can
// potentially add a redundant callback if we're coming from the Dying
// state, but that's okay (we won't call the user callbacks twice).
if (!session.m_upload_completion_callbacks.empty())
session.add_completion_callback(_impl::SyncProgressNotifier::NotifierType::upload);
if (!session.m_download_completion_callbacks.empty())
session.add_completion_callback(_impl::SyncProgressNotifier::NotifierType::download);
SyncSession::CompletionCallbacks callbacks_to_register;
std::swap(session.m_completion_callbacks, callbacks_to_register);

for (auto& [id, callback_tuple] : callbacks_to_register) {
session.add_completion_callback(lock, std::move(callback_tuple.second), callback_tuple.first);
}
}

void log_out(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
Expand Down Expand Up @@ -164,12 +162,6 @@ struct sync_session_states::Active : public SyncSession::State {
session.advance_state(lock, inactive);
}

void wait_for_completion(SyncSession& session, _impl::SyncProgressNotifier::NotifierType direction) const override
{
REALM_ASSERT(session.m_session);
session.add_completion_callback(direction);
}

void handle_reconnect(std::unique_lock<std::mutex>&, SyncSession& session) const override
{
session.m_session->cancel_reconnect_delay();
Expand All @@ -189,7 +181,7 @@ struct sync_session_states::Dying : public SyncSession::State {
}

size_t current_death_count = ++session.m_death_count;
std::weak_ptr<SyncSession> weak_session = session.shared_from_this();
auto weak_session = session.weak_from_this();
session.m_session->async_wait_for_upload_completion([weak_session, current_death_count](std::error_code) {
if (auto session = weak_session.lock()) {
std::unique_lock<std::mutex> lock(session->m_state_mutex);
Expand Down Expand Up @@ -223,12 +215,6 @@ struct sync_session_states::Dying : public SyncSession::State {
{
session.advance_state(lock, inactive);
}

void wait_for_completion(SyncSession& session, _impl::SyncProgressNotifier::NotifierType direction) const override
{
REALM_ASSERT(session.m_session);
session.add_completion_callback(direction);
}
};

struct sync_session_states::Inactive : public SyncSession::State {
Expand All @@ -243,10 +229,8 @@ struct sync_session_states::Inactive : public SyncSession::State {
auto old_state = session.m_connection_state;
auto new_state = session.m_connection_state = SyncSession::ConnectionState::Disconnected;

auto download_waits = std::move(session.m_download_completion_callbacks);
auto upload_waits = std::move(session.m_upload_completion_callbacks);
session.m_download_completion_callbacks.clear();
session.m_upload_completion_callbacks.clear();
SyncSession::CompletionCallbacks waits;
std::swap(waits, session.m_completion_callbacks);

session.m_session = nullptr;
session.unregister(lock); // releases lock
Expand All @@ -257,10 +241,8 @@ struct sync_session_states::Inactive : public SyncSession::State {
}

// Inform any queued-up completion handlers that they were cancelled.
for (auto& callback : download_waits)
callback(make_error_code(util::error::operation_aborted));
for (auto& callback : upload_waits)
callback(make_error_code(util::error::operation_aborted));
for (auto& [id, callback] : waits)
callback.second(make_error_code(util::error::operation_aborted));
}

bool revive_if_needed(std::unique_lock<std::mutex>& lock, SyncSession& session) const override
Expand All @@ -275,7 +257,6 @@ struct sync_session_states::Inactive : public SyncSession::State {
}
};


const SyncSession::State& SyncSession::State::active = Active();
const SyncSession::State& SyncSession::State::dying = Dying();
const SyncSession::State& SyncSession::State::inactive = Inactive();
Expand Down Expand Up @@ -379,23 +360,20 @@ void SyncSession::handle_error(SyncError error)
// when the old session is torn down, which we don't want as this
// is supposed to be transparent to the user.
//
// To avoid this, we need to do two things: move the completion
// handlers aside temporarily so that moving to the inactive
// state doesn't clear them, and track which sync::Session each
// completion notification came from so that we can ignore
// notifications from the old session.
// To avoid this, we need to move the completion handlers aside temporarily so
// that moving to the inactive state doesn't clear them - they will be re-registered
// when the session becomes active again.
{
std::unique_lock<std::mutex> lock(m_state_mutex);
m_force_client_resync = true;

++m_client_resync_counter;
auto download_handlers = std::move(m_download_completion_callbacks);
auto upload_handlers = std::move(m_upload_completion_callbacks);

CompletionCallbacks callbacks;
std::swap(m_completion_callbacks, callbacks);
advance_state(lock, State::inactive);

m_download_completion_callbacks = std::move(download_handlers);
m_upload_completion_callbacks = std::move(upload_handlers);
// FIXME This should be done in a scope guard so that we always do this, even if advance_state
// throws.
std::swap(callbacks, m_completion_callbacks);
}
revive_if_needed();
return;
Expand Down Expand Up @@ -566,15 +544,14 @@ void SyncSession::handle_error(SyncError error)

void SyncSession::cancel_pending_waits(std::unique_lock<std::mutex>& lock, std::error_code error)
{
auto download = std::move(m_download_completion_callbacks);
auto upload = std::move(m_upload_completion_callbacks);

CompletionCallbacks callbacks;
std::swap(callbacks, m_completion_callbacks);
lock.unlock();

// Inform any queued-up completion handlers that they were cancelled.
for (auto& callback : download)
callback(error);
for (auto& callback : upload)
callback(error);
for (auto& [id, callback] : callbacks)
callback.second(error);
}

void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable,
Expand Down Expand Up @@ -743,47 +720,51 @@ void SyncSession::unregister(std::unique_lock<std::mutex>& lock)
m_config.user->sync_manager()->unregister_session(m_realm_path);
}

void SyncSession::add_completion_callback(_impl::SyncProgressNotifier::NotifierType direction)
void SyncSession::add_completion_callback(const std::unique_lock<std::mutex>&,
std::function<void(std::error_code)> callback,
_impl::SyncProgressNotifier::NotifierType direction)
{
bool is_download = direction == _impl::SyncProgressNotifier::NotifierType::download;
bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download);

m_completion_request_counter++;
m_completion_callbacks.emplace_hint(m_completion_callbacks.end(),
m_completion_request_counter,
std::make_pair(
direction,
std::move(callback)));
// If the state is inactive then just store the callback and return. The callback will get
// re-registered with the underlying session if/when the session ever becomes active again.
if (get_public_state() == PublicState::Inactive) {
return;
}

int resync_counter = m_client_resync_counter;
std::weak_ptr<SyncSession> weak_self = shared_from_this();
auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
: &sync::Session::async_wait_for_upload_completion;
(m_session.get()->*waiter)([resync_counter, weak_self, is_download](std::error_code ec) {

(m_session.get()->*waiter)([weak_self = weak_from_this(),
id = m_completion_request_counter
] (std::error_code ec) {
auto self = weak_self.lock();
if (!self)
return;
std::unique_lock<std::mutex> lock(self->m_state_mutex);
if (resync_counter != self->m_client_resync_counter) {
// This callback was registered on a previous sync session and not
// the current one, so we want to simply discard completion
// notifications from it
return;
}
auto callbacks = std::move(is_download ? self->m_download_completion_callbacks
: self->m_upload_completion_callbacks);
auto callback_node = self->m_completion_callbacks.extract(id);
lock.unlock();
for (auto& callback : callbacks)
callback(ec);
if (callback_node)
callback_node.mapped().second(ec);
});
}

void SyncSession::wait_for_upload_completion(std::function<void(std::error_code)> callback)
{
std::unique_lock<std::mutex> lock(m_state_mutex);
if (m_upload_completion_callbacks.empty())
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::upload);
m_upload_completion_callbacks.push_back(std::move(callback));
add_completion_callback(lock, std::move(callback), _impl::SyncProgressNotifier::NotifierType::upload);
}

void SyncSession::wait_for_download_completion(std::function<void(std::error_code)> callback)
{
std::unique_lock<std::mutex> lock(m_state_mutex);
if (m_download_completion_callbacks.empty())
m_state->wait_for_completion(*this, _impl::SyncProgressNotifier::NotifierType::download);
m_download_completion_callbacks.push_back(std::move(callback));
add_completion_callback(lock, std::move(callback), _impl::SyncProgressNotifier::NotifierType::download);
}

uint64_t SyncSession::register_progress_notifier(std::function<SyncProgressNotifierCallback> notifier,
Expand Down
16 changes: 9 additions & 7 deletions src/sync/sync_session.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
Expand Down Expand Up @@ -27,6 +27,7 @@

#include <mutex>
#include <unordered_map>
#include <map>

namespace realm {

Expand Down Expand Up @@ -274,6 +275,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

private:
using std::enable_shared_from_this<SyncSession>::shared_from_this;
using CompletionCallbacks = std::map<int64_t,
std::pair<_impl::SyncProgressNotifier::NotifierType, std::function<void(std::error_code)>>>;

struct State;
friend struct _impl::sync_session_states::Active;
Expand Down Expand Up @@ -336,7 +339,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void unregister(std::unique_lock<std::mutex>& lock);
void did_drop_external_reference();

void add_completion_callback(_impl::SyncProgressNotifier::NotifierType direction);
void add_completion_callback(const std::unique_lock<std::mutex>&,
std::function<void(std::error_code)> callback,
_impl::SyncProgressNotifier::NotifierType direction);

std::function<SyncSessionTransactCallback> m_sync_transact_callback;

Expand All @@ -356,11 +361,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
std::string m_realm_path;
_impl::SyncClient& m_client;

std::vector<std::function<void(std::error_code)>> m_download_completion_callbacks;
std::vector<std::function<void(std::error_code)>> m_upload_completion_callbacks;
// How many times a client resync has occurred. Used to discard session
// completion notifications from before the most recent client resync.
int m_client_resync_counter = 0;
int64_t m_completion_request_counter = 0;
CompletionCallbacks m_completion_callbacks;

// The underlying `Session` object that is owned and managed by this `SyncSession`.
// The session is first created when the `SyncSession` is moved out of its initial `inactive` state.
Expand Down

0 comments on commit 3377523

Please sign in to comment.