Skip to content

Commit

Permalink
Clear out SubscriptionStore and cancel pending notifications upon rol…
Browse files Browse the repository at this point in the history
…lback to PBS (#6489)

* Added terminate() and notify_all_pending() to Subscription Store
* Rework of mutable subscription store and notify all
* Removed notifying the subscription waiters on error
* Lazily create subscription store when needed
* Added extra migrate to FLX step to verify sub store clearing
* Added subscription store unit tests
* Updated notify test to verify subs state before/after notify
* Updates from review, added extra flx migration test steps
  • Loading branch information
Michael Wilkerson-Barker authored Apr 26, 2023
1 parent 7582518 commit 1edcd92
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 55 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-----------

### Internals
* None.
* Clear out SubscriptionStore and cancel pending notifications upon rollback to PBS after client migration to FLX. ([#6389](https://github.com/realm/realm-core/issues/6389))

----------------------------------------------

Expand Down
61 changes: 45 additions & 16 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool re
auto session_user = session->user();
if (!session_user) {
util::CheckedUniqueLock lock(session->m_state_mutex);
session->cancel_pending_waits(std::move(lock), error ? error->to_status() : Status::OK());
auto refresh_error = error ? error->to_status() : Status::OK();
session->cancel_pending_waits(std::move(lock), refresh_error);
}
else if (error) {
if (error->code() == ErrorCodes::ClientAppDeallocated) {
Expand Down Expand Up @@ -827,7 +828,8 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
break;
}
case NextStateAfterError::error: {
cancel_pending_waits(std::move(lock), sync_error.to_status());
auto error = sync_error.to_status();
cancel_pending_waits(std::move(lock), error);
break;
}
}
Expand All @@ -842,11 +844,21 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
}
}

void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error)
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error,
std::optional<Status> subs_notify_error)
{
CompletionCallbacks callbacks;
std::swap(callbacks, m_completion_callbacks);
m_state_mutex.unlock(lock);

// Inform any waiters on pending subscription states that they were cancelled
if (subs_notify_error && m_flx_subscription_store) {
auto subscription_store = m_flx_subscription_store;
m_state_mutex.unlock(lock);
subscription_store->notify_all_state_change_notifications(*subs_notify_error);
}
else {
m_state_mutex.unlock(lock);
}

// Inform any queued-up completion handlers that they were cancelled.
for (auto& [id, callback] : callbacks)
Expand Down Expand Up @@ -1305,6 +1317,12 @@ std::shared_ptr<sync::SubscriptionStore> SyncSession::get_flx_subscription_store
return m_flx_subscription_store;
}

std::shared_ptr<sync::SubscriptionStore> SyncSession::get_subscription_store_base()
{
util::CheckedLockGuard lock(m_state_mutex);
return m_subscription_store_base;
}

sync::SaltedFileIdent SyncSession::get_file_ident() const
{
auto repl = m_db->get_replication();
Expand Down Expand Up @@ -1371,9 +1389,10 @@ void SyncSession::update_subscription_store(bool flx_sync_requested)
if (!flx_sync_requested) {
if (m_flx_subscription_store) {
// Empty the subscription store and cancel any pending subscription notification
// waiters - will be done in a separate PR
m_flx_subscription_store.reset();
// waiters
auto subscription_store = std::move(m_flx_subscription_store);
lock.unlock();
subscription_store->terminate();
auto tr = m_db->start_write();
history.set_write_validator_factory(nullptr);
tr->rollback();
Expand Down Expand Up @@ -1402,16 +1421,26 @@ void SyncSession::update_subscription_store(bool flx_sync_requested)
void SyncSession::create_subscription_store()
{
REALM_ASSERT(!m_flx_subscription_store);
m_flx_subscription_store = sync::SubscriptionStore::create(m_db, [this](int64_t new_version) {
util::CheckedLockGuard lk(m_state_mutex);
if (m_state != State::Active && m_state != State::WaitingForAccessToken) {
return;
}
// There may be no session yet (i.e., waiting to refresh the access token).
if (m_session) {
m_session->on_new_flx_sync_subscription(new_version);
}
});

// Create the main subscription store instance when this is first called - this will
// remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
// will be reset when rolling back to PBS after a client FLX migration
if (!m_subscription_store_base) {
m_subscription_store_base = sync::SubscriptionStore::create(m_db, [this](int64_t new_version) {
util::CheckedLockGuard lk(m_state_mutex);
if (m_state != State::Active && m_state != State::WaitingForAccessToken) {
return;
}
// There may be no session yet (i.e., waiting to refresh the access token).
if (m_session) {
m_session->on_new_flx_sync_subscription(new_version);
}
});
}

// m_subscription_store_base is always around for the life of SyncSession, but the
// m_flx_subscription_store is set when using FLX.
m_flx_subscription_store = m_subscription_store_base;
}

void SyncSession::set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr)
Expand Down
17 changes: 16 additions & 1 deletion src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
{
return session.get_file_ident();
}

static std::shared_ptr<sync::SubscriptionStore> get_subscription_store_base(SyncSession& session)
{
return session.get_subscription_store_base();
}
};

private:
Expand Down Expand Up @@ -375,7 +380,10 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void handle_error(sync::SessionErrorInfo) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
void handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status error_code, std::string_view context_message)
REQUIRES(!m_state_mutex, !m_config_mutex);
void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex);
// If sub_notify_error is set (including Status::OK()), then the pending subscription waiters will
// also be called with the sub_notify_error status value.
void cancel_pending_waits(util::CheckedUniqueLock, Status, std::optional<Status> subs_notify_error = std::nullopt)
RELEASE(m_state_mutex);
enum class ShouldBackup { yes, no };
void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t);
Expand Down Expand Up @@ -429,6 +437,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
// Create active subscription set after PBS -> FLX migration to cover the data.
void make_active_subscription_set() REQUIRES(!m_state_mutex);

// Return the subscription_store_base - to be used only for testing
std::shared_ptr<sync::SubscriptionStore> get_subscription_store_base() REQUIRES(!m_state_mutex);

mutable util::CheckedMutex m_state_mutex;
mutable util::CheckedMutex m_connection_state_mutex;

Expand All @@ -443,6 +454,10 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
mutable util::CheckedMutex m_config_mutex;
RealmConfig m_config GUARDED_BY(m_config_mutex);
const std::shared_ptr<DB> m_db;
// The subscription store base is lazily created when needed, but never destroyed
std::shared_ptr<sync::SubscriptionStore> m_subscription_store_base GUARDED_BY(m_state_mutex);
// m_flx_subscription_store will either point to m_subscription_store_base if currently using FLX
// or set to nullptr if currently using PBS (mutable for client PBS->FLX migration)
std::shared_ptr<sync::SubscriptionStore> m_flx_subscription_store GUARDED_BY(m_state_mutex);
std::optional<sync::SubscriptionSet> m_active_subscriptions_after_migration GUARDED_BY(m_state_mutex);
// Original sync config for reverting back to PBS if FLX migration is rolled back
Expand Down
52 changes: 49 additions & 3 deletions src/realm/sync/subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,20 @@ SubscriptionStore::SubscriptionStore(DBRef db, util::UniqueFunction<void(int64_t
load_sync_metadata_schema(tr, &internal_tables);
}

// There should always be at least one subscription set so that the user can always wait for synchronizationon
// on the result of get_latest().
if (auto sub_sets = tr->get_table(m_sub_set_table); sub_sets->is_empty()) {
// Make sure the subscription set table is properly initialized
initialize_subscriptions_table(std::move(tr), false);
}

void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool clear_table)
{
if (auto sub_sets = tr->get_table(m_sub_set_table); clear_table || sub_sets->is_empty()) {
tr->promote_to_write();
// If erase_table is true, clear out the sub_sets table
if (clear_table) {
sub_sets->clear();
}
// There should always be at least one subscription set so that the user can always wait
// for synchronizationon on the result of get_latest().
auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)});
zero_sub.set(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending));
zero_sub.set(m_sub_set_snapshot_version, tr->get_version());
Expand Down Expand Up @@ -798,6 +808,42 @@ std::vector<SubscriptionSet> SubscriptionStore::get_pending_subscriptions() cons
return subscriptions_to_recover;
}

void SubscriptionStore::notify_all_state_change_notifications(Status status)
{
std::unique_lock<std::mutex> lk(m_pending_notifications_mutex);
m_pending_notifications_cv.wait(lk, [&] {
return m_outstanding_requests == 0;
});

auto to_finish = std::move(m_pending_notifications);
lk.unlock();

// Just complete/cancel the pending notifications - this function does not alter the
// state of any pending subscriptions
for (auto& req : to_finish) {
req.promise.set_error(status);
}
}

void SubscriptionStore::terminate()
{
// Clear out and initialize the subscription store
initialize_subscriptions_table(m_db->start_read(), true);

std::unique_lock<std::mutex> lk(m_pending_notifications_mutex);
m_pending_notifications_cv.wait(lk, [&] {
return m_outstanding_requests == 0;
});
auto to_finish = std::move(m_pending_notifications);
m_min_outstanding_version = 0;

lk.unlock();

for (auto& req : to_finish) {
req.promise.emplace_value(SubscriptionSet::State::Superseded);
}
}

MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version_id)
{
auto tr = m_db->start_write();
Expand Down
14 changes: 14 additions & 0 deletions src/realm/sync/subscriptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,16 @@ class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore>
DB::version_type after_client_version) const;
std::vector<SubscriptionSet> get_pending_subscriptions() const;

// Notify all subscription state change notification handlers on this subscription store with the
// provided Status - this does not change the state of any pending subscriptions.
// Does not necessarily need to be called from the event loop thread.
void notify_all_state_change_notifications(Status status);

// Reset SubscriptionStore and erase all current subscriptions and supersede any pending
// subscriptions. Must be called from the event loop thread to prevent data race issues
// with the subscription store.
void terminate();

private:
using std::enable_shared_from_this<SubscriptionStore>::weak_from_this;
DBRef m_db;
Expand All @@ -375,6 +385,10 @@ class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore>
SubscriptionSet get_by_version_impl(int64_t flx_version, util::Optional<DB::VersionID> version) const;
MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set) const;

// Ensure the subscriptions table is properly initialized
// If clear_table is true, the subscriptions table will be cleared before initialization
void initialize_subscriptions_table(TransactionRef&& tr, bool clear_table);

friend class MutableSubscriptionSet;
friend class Subscription;
friend class SubscriptionSet;
Expand Down
Loading

0 comments on commit 1edcd92

Please sign in to comment.