Skip to content

Commit

Permalink
Fix race in SubscriptionSet::get_state_change_nofication() (#5146)
Browse files Browse the repository at this point in the history
If there have been writes to the database since SubscriptionSet was
created we need to refresh our state before maybe returning a ready
future. We also need to block process_notifications while determining
whether to return a ready future because otherwise the state may change
between deciding not to add a ready future and acquiring the lock to add
the pending future.
  • Loading branch information
jbreams authored Jan 8, 2022
1 parent 37f422a commit 952a194
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* SyncManager::path_for_realm now returns a default path when FLX sync is enabled ([#5088](https://github.com/realm/realm-core/pull/5088))
* Having links in a property of Mixed type would lead to ill-formed JSON output when serializing the database. ([#5125](https://github.com/realm/realm-core/issues/5125), since v11.0.0)
* FLX sync QUERY messages are now ordered with UPLOAD messages ([#5135](https://github.com/realm/realm-core/pull/5135))
* Fixed race condition when waiting for state change notifications on FLX subscription sets that may have caused a hang ([#5146](https://github.com/realm/realm-core/pull/5146))

### Breaking changes
* FLX SubscriptionSet type split into SubscriptionSet and MutableSubscriptionSet to add type safety ([#5092](https://github.com/realm/realm-core/pull/5092))
Expand Down
44 changes: 35 additions & 9 deletions src/realm/sync/subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,23 +319,46 @@ void SubscriptionSet::refresh()

util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notification(State notify_when) const
{
std::unique_lock<std::mutex> lk(m_mgr->m_pending_notifications_mutex);
// If we've already been superceded by another version getting completed, then we should skip registering
// a notification because it may never fire.
if (m_mgr->m_min_outstanding_version > version()) {
return util::Future<State>::make_ready(State::Superceded);
}

// Begin by blocking process_notifications from starting to fill futures. No matter the outcome, we'll
// unblock process_notifications() at the end of this function via the guard we construct below.
m_mgr->m_outstanding_requests++;
auto guard = util::make_scope_exit([&]() noexcept {
if (!lk.owns_lock()) {
lk.lock();
}
--m_mgr->m_outstanding_requests;
m_mgr->m_pending_notifications_cv.notify_one();
});
lk.unlock();

State cur_state = state();
StringData err_str = error_str();

// If there have been writes to the database since this SubscriptionSet was created, we need to fetch
// the updated version from the DB to know the true current state and maybe return a ready future.
if (m_tr->get_version() < m_mgr->m_db->get_version_of_latest_snapshot()) {
auto refreshed_self = m_mgr->get_by_version(version());
cur_state = refreshed_self.state();
err_str = refreshed_self.error_str();
}
// If we've already reached the desired state, or if the subscription is in an error state,
// we can return a ready future immediately.
auto cur_state = state();
if (cur_state == State::Error) {
return util::Future<State>::make_ready(Status{ErrorCodes::RuntimeError, error_str()});
return util::Future<State>::make_ready(Status{ErrorCodes::RuntimeError, err_str});
}
else if (cur_state >= notify_when) {
return util::Future<State>::make_ready(cur_state);
}

std::lock_guard<std::mutex> lk(m_mgr->m_pending_notifications_mutex);

// If we've already been superceded by another version getting completed, then we should skip registering
// a notification because it may never fire.
if (m_mgr->m_min_outstanding_version > version()) {
return util::Future<State>::make_ready(State::Superceded);
}
// Otherwise put in a new request to be filled in by process_notifications().
lk.lock();

// Otherwise, make a promise/future pair and add it to the list of pending notifications.
auto [promise, future] = util::make_promise_future<State>();
Expand All @@ -350,6 +373,9 @@ void MutableSubscriptionSet::process_notifications()

std::list<SubscriptionStore::NotificationRequest> to_finish;
std::unique_lock<std::mutex> lk(m_mgr->m_pending_notifications_mutex);
m_mgr->m_pending_notifications_cv.wait(lk, [&] {
return m_mgr->m_outstanding_requests == 0;
});
for (auto it = m_mgr->m_pending_notifications.begin(); it != m_mgr->m_pending_notifications.end();) {
if ((it->version == my_version && (new_state == State::Error || new_state >= it->notify_when)) ||
(new_state == State::Complete && it->version < my_version)) {
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/subscriptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ class SubscriptionStore {
std::unique_ptr<SubscriptionKeys> m_sub_keys;

mutable std::mutex m_pending_notifications_mutex;
mutable std::condition_variable m_pending_notifications_cv;
mutable int64_t m_outstanding_requests = 0;
mutable int64_t m_min_outstanding_version = 0;
mutable std::list<NotificationRequest> m_pending_notifications;
};
Expand Down
16 changes: 16 additions & 0 deletions test/test_sync_subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,22 @@ TEST(Sync_SubscriptionStoreNotifications)
// immediately.
CHECK_EQUAL(sub_set.get_state_change_notification(SubscriptionSet::State::Bootstrapping).get(),
SubscriptionSet::State::Complete);

// Check that if a subscription set gets updated to a new state and the SubscriptionSet returned by commit() is
// not explicitly refreshed (i.e. is reading from a snapshot from before the state change), that it can still
// return a ready future.
auto mut_set = store.get_latest().make_mutable_copy();
auto waitable_set = std::move(mut_set).commit();

{
mut_set = store.get_mutable_by_version(waitable_set.version());
mut_set.update_state(SubscriptionSet::State::Complete);
std::move(mut_set).commit();
}

auto fut = waitable_set.get_state_change_notification(SubscriptionSet::State::Complete);
CHECK(fut.is_ready());
CHECK_EQUAL(std::move(fut).get(), SubscriptionSet::State::Complete);
}

} // namespace realm::sync

0 comments on commit 952a194

Please sign in to comment.