diff --git a/CHANGELOG.md b/CHANGELOG.md index c149c5db384..d588149b9f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ ----------- ### Internals -* None. +* Clear out SubscriptionStore and cancel pending notifications upon rollback to PBS after client migration to FLX. ([#6389](https://github.com/realm/realm-core/issues/6389)) ---------------------------------------------- diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 8b85bbb493b..9ca7880f740 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -288,7 +288,8 @@ SyncSession::handle_refresh(const std::shared_ptr& session, bool re auto session_user = session->user(); if (!session_user) { util::CheckedUniqueLock lock(session->m_state_mutex); - session->cancel_pending_waits(std::move(lock), error ? error->to_status() : Status::OK()); + auto refresh_error = error ? error->to_status() : Status::OK(); + session->cancel_pending_waits(std::move(lock), refresh_error); } else if (error) { if (error->code() == ErrorCodes::ClientAppDeallocated) { @@ -827,7 +828,8 @@ void SyncSession::handle_error(sync::SessionErrorInfo error) break; } case NextStateAfterError::error: { - cancel_pending_waits(std::move(lock), sync_error.to_status()); + auto error = sync_error.to_status(); + cancel_pending_waits(std::move(lock), error); break; } } @@ -842,11 +844,21 @@ void SyncSession::handle_error(sync::SessionErrorInfo error) } } -void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error) +void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error, + std::optional subs_notify_error) { CompletionCallbacks callbacks; std::swap(callbacks, m_completion_callbacks); - m_state_mutex.unlock(lock); + + // Inform any waiters on pending subscription states that they were cancelled + if (subs_notify_error && m_flx_subscription_store) { + auto subscription_store = m_flx_subscription_store; + m_state_mutex.unlock(lock); + subscription_store->notify_all_state_change_notifications(*subs_notify_error); + } + else { + m_state_mutex.unlock(lock); + } // Inform any queued-up completion handlers that they were cancelled. for (auto& [id, callback] : callbacks) @@ -1305,6 +1317,12 @@ std::shared_ptr SyncSession::get_flx_subscription_store return m_flx_subscription_store; } +std::shared_ptr SyncSession::get_subscription_store_base() +{ + util::CheckedLockGuard lock(m_state_mutex); + return m_subscription_store_base; +} + sync::SaltedFileIdent SyncSession::get_file_ident() const { auto repl = m_db->get_replication(); @@ -1371,9 +1389,10 @@ void SyncSession::update_subscription_store(bool flx_sync_requested) if (!flx_sync_requested) { if (m_flx_subscription_store) { // Empty the subscription store and cancel any pending subscription notification - // waiters - will be done in a separate PR - m_flx_subscription_store.reset(); + // waiters + auto subscription_store = std::move(m_flx_subscription_store); lock.unlock(); + subscription_store->terminate(); auto tr = m_db->start_write(); history.set_write_validator_factory(nullptr); tr->rollback(); @@ -1402,16 +1421,26 @@ void SyncSession::update_subscription_store(bool flx_sync_requested) void SyncSession::create_subscription_store() { REALM_ASSERT(!m_flx_subscription_store); - m_flx_subscription_store = sync::SubscriptionStore::create(m_db, [this](int64_t new_version) { - util::CheckedLockGuard lk(m_state_mutex); - if (m_state != State::Active && m_state != State::WaitingForAccessToken) { - return; - } - // There may be no session yet (i.e., waiting to refresh the access token). - if (m_session) { - m_session->on_new_flx_sync_subscription(new_version); - } - }); + + // Create the main subscription store instance when this is first called - this will + // remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store + // will be reset when rolling back to PBS after a client FLX migration + if (!m_subscription_store_base) { + m_subscription_store_base = sync::SubscriptionStore::create(m_db, [this](int64_t new_version) { + util::CheckedLockGuard lk(m_state_mutex); + if (m_state != State::Active && m_state != State::WaitingForAccessToken) { + return; + } + // There may be no session yet (i.e., waiting to refresh the access token). + if (m_session) { + m_session->on_new_flx_sync_subscription(new_version); + } + }); + } + + // m_subscription_store_base is always around for the life of SyncSession, but the + // m_flx_subscription_store is set when using FLX. + m_flx_subscription_store = m_subscription_store_base; } void SyncSession::set_write_validator_factory(std::weak_ptr weak_sub_mgr) diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index e79a6e8d282..4504d87af2e 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -309,6 +309,11 @@ class SyncSession : public std::enable_shared_from_this { { return session.get_file_ident(); } + + static std::shared_ptr get_subscription_store_base(SyncSession& session) + { + return session.get_subscription_store_base(); + } }; private: @@ -375,7 +380,10 @@ class SyncSession : public std::enable_shared_from_this { void handle_error(sync::SessionErrorInfo) REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex); void handle_bad_auth(const std::shared_ptr& user, Status error_code, std::string_view context_message) REQUIRES(!m_state_mutex, !m_config_mutex); - void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex); + // If sub_notify_error is set (including Status::OK()), then the pending subscription waiters will + // also be called with the sub_notify_error status value. + void cancel_pending_waits(util::CheckedUniqueLock, Status, std::optional subs_notify_error = std::nullopt) + RELEASE(m_state_mutex); enum class ShouldBackup { yes, no }; void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex); void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t); @@ -429,6 +437,9 @@ class SyncSession : public std::enable_shared_from_this { // Create active subscription set after PBS -> FLX migration to cover the data. void make_active_subscription_set() REQUIRES(!m_state_mutex); + // Return the subscription_store_base - to be used only for testing + std::shared_ptr get_subscription_store_base() REQUIRES(!m_state_mutex); + mutable util::CheckedMutex m_state_mutex; mutable util::CheckedMutex m_connection_state_mutex; @@ -443,6 +454,10 @@ class SyncSession : public std::enable_shared_from_this { mutable util::CheckedMutex m_config_mutex; RealmConfig m_config GUARDED_BY(m_config_mutex); const std::shared_ptr m_db; + // The subscription store base is lazily created when needed, but never destroyed + std::shared_ptr m_subscription_store_base GUARDED_BY(m_state_mutex); + // m_flx_subscription_store will either point to m_subscription_store_base if currently using FLX + // or set to nullptr if currently using PBS (mutable for client PBS->FLX migration) std::shared_ptr m_flx_subscription_store GUARDED_BY(m_state_mutex); std::optional m_active_subscriptions_after_migration GUARDED_BY(m_state_mutex); // Original sync config for reverting back to PBS if FLX migration is rolled back diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp index ec25279518e..a412810dd73 100644 --- a/src/realm/sync/subscriptions.cpp +++ b/src/realm/sync/subscriptions.cpp @@ -674,10 +674,20 @@ SubscriptionStore::SubscriptionStore(DBRef db, util::UniqueFunctionget_table(m_sub_set_table); sub_sets->is_empty()) { + // Make sure the subscription set table is properly initialized + initialize_subscriptions_table(std::move(tr), false); +} + +void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool clear_table) +{ + if (auto sub_sets = tr->get_table(m_sub_set_table); clear_table || sub_sets->is_empty()) { tr->promote_to_write(); + // If erase_table is true, clear out the sub_sets table + if (clear_table) { + sub_sets->clear(); + } + // There should always be at least one subscription set so that the user can always wait + // for synchronizationon on the result of get_latest(). auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)}); zero_sub.set(m_sub_set_state, static_cast(SubscriptionSet::State::Pending)); zero_sub.set(m_sub_set_snapshot_version, tr->get_version()); @@ -798,6 +808,42 @@ std::vector SubscriptionStore::get_pending_subscriptions() cons return subscriptions_to_recover; } +void SubscriptionStore::notify_all_state_change_notifications(Status status) +{ + std::unique_lock lk(m_pending_notifications_mutex); + m_pending_notifications_cv.wait(lk, [&] { + return m_outstanding_requests == 0; + }); + + auto to_finish = std::move(m_pending_notifications); + lk.unlock(); + + // Just complete/cancel the pending notifications - this function does not alter the + // state of any pending subscriptions + for (auto& req : to_finish) { + req.promise.set_error(status); + } +} + +void SubscriptionStore::terminate() +{ + // Clear out and initialize the subscription store + initialize_subscriptions_table(m_db->start_read(), true); + + std::unique_lock lk(m_pending_notifications_mutex); + m_pending_notifications_cv.wait(lk, [&] { + return m_outstanding_requests == 0; + }); + auto to_finish = std::move(m_pending_notifications); + m_min_outstanding_version = 0; + + lk.unlock(); + + for (auto& req : to_finish) { + req.promise.emplace_value(SubscriptionSet::State::Superseded); + } +} + MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version_id) { auto tr = m_db->start_write(); diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp index d94ed23c6ff..a2d4247056d 100644 --- a/src/realm/sync/subscriptions.hpp +++ b/src/realm/sync/subscriptions.hpp @@ -349,6 +349,16 @@ class SubscriptionStore : public std::enable_shared_from_this DB::version_type after_client_version) const; std::vector get_pending_subscriptions() const; + // Notify all subscription state change notification handlers on this subscription store with the + // provided Status - this does not change the state of any pending subscriptions. + // Does not necessarily need to be called from the event loop thread. + void notify_all_state_change_notifications(Status status); + + // Reset SubscriptionStore and erase all current subscriptions and supersede any pending + // subscriptions. Must be called from the event loop thread to prevent data race issues + // with the subscription store. + void terminate(); + private: using std::enable_shared_from_this::weak_from_this; DBRef m_db; @@ -375,6 +385,10 @@ class SubscriptionStore : public std::enable_shared_from_this SubscriptionSet get_by_version_impl(int64_t flx_version, util::Optional version) const; MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set) const; + // Ensure the subscriptions table is properly initialized + // If clear_table is true, the subscriptions table will be cleared before initialization + void initialize_subscriptions_table(TransactionRef&& tr, bool clear_table); + friend class MutableSubscriptionSet; friend class Subscription; friend class SubscriptionSet; diff --git a/test/object-store/sync/flx_migration.cpp b/test/object-store/sync/flx_migration.cpp index 4a3c3d9b81a..d4d6b5d21f6 100644 --- a/test/object-store/sync/flx_migration.cpp +++ b/test/object-store/sync/flx_migration.cpp @@ -19,20 +19,23 @@ using namespace realm; -static void trigger_server_migration(const AppSession& app_session, bool switch_to_flx, +enum MigrationMode { MigrateToFLX, RollbackToPBS }; + +static void trigger_server_migration(const AppSession& app_session, MigrationMode switch_mode, const std::shared_ptr& logger) { auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id); REQUIRE(app_session.admin_api.is_sync_enabled(app_session.server_app_id)); - app_session.admin_api.migrate_to_flx(app_session.server_app_id, baas_sync_service.id, switch_to_flx); + app_session.admin_api.migrate_to_flx(app_session.server_app_id, baas_sync_service.id, + switch_mode == MigrateToFLX); // While the server migration is in progress, the server cannot be used - wait until the migration // is complete. migrated with be populated with the 'isMigrated' value from the complete response AdminAPISession::MigrationStatus status; std::string last_status; - std::string op_stg = [switch_to_flx] { - if (switch_to_flx) + std::string op_stg = [switch_mode] { + if (switch_mode == MigrateToFLX) return "PBS->FLX Server migration"; else return "FLX->PBS Server rollback"; @@ -59,7 +62,7 @@ static void trigger_server_migration(const AppSession& app_session, bool switch_ if (logger) { logger->debug("%1 complete", op_stg); } - REQUIRE(switch_to_flx == status.isMigrated); + REQUIRE((switch_mode == MigrateToFLX) == status.isMigrated); } // Add a set of count number of Object objects to the realm @@ -146,7 +149,7 @@ TEST_CASE("Test server migration and rollback", "[flx][migration]") { } // Migrate to FLX - trigger_server_migration(session.app_session(), true, logger_ptr); + trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr); { SyncTestFile flx_config(session.app()->current_user(), server_app_config.schema, @@ -197,7 +200,7 @@ TEST_CASE("Test server migration and rollback", "[flx][migration]") { } // Roll back to PBS - trigger_server_migration(session.app_session(), false, logger_ptr); + trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr); // Try to connect as FLX { @@ -266,11 +269,11 @@ TEST_CASE("Test client migration and rollback", "[flx][migration]") { REQUIRE(!wait_for_download(*realm)); auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 5); + REQUIRE(table->size() == 5); } // Migrate to FLX - trigger_server_migration(session.app_session(), true, logger_ptr); + trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr); { auto realm = Realm::get_shared_realm(config); @@ -279,11 +282,11 @@ TEST_CASE("Test client migration and rollback", "[flx][migration]") { REQUIRE(!wait_for_download(*realm)); auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 5); + REQUIRE(table->size() == 5); } // Roll back to PBS - trigger_server_migration(session.app_session(), false, logger_ptr); + trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr); { auto realm = Realm::get_shared_realm(config); @@ -292,7 +295,7 @@ TEST_CASE("Test client migration and rollback", "[flx][migration]") { REQUIRE(!wait_for_download(*realm)); auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 5); + REQUIRE(table->size() == 5); } } @@ -325,7 +328,7 @@ TEST_CASE("Test client migration and rollback with recovery", "[flx][migration]" REQUIRE(!wait_for_download(*realm)); auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 5); + REQUIRE(table->size() == 5); // Close the sync session and make a change. This will be recovered by the migration. realm->sync_session()->force_close(); @@ -338,42 +341,91 @@ TEST_CASE("Test client migration and rollback with recovery", "[flx][migration]" } // Migrate to FLX - trigger_server_migration(session.app_session(), true, logger_ptr); + trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr); - { - auto realm = Realm::get_shared_realm(config); + // Keep this realm around for after the revert to PBS + auto outer_realm = Realm::get_shared_realm(config); - REQUIRE(!wait_for_upload(*realm)); - REQUIRE(!wait_for_download(*realm)); + REQUIRE(!wait_for_upload(*outer_realm)); + REQUIRE(!wait_for_download(*outer_realm)); - auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 6); - realm->begin_transaction(); - auto pending_object = realm->read_group().get_table("class_Object")->get_object_with_primary_key(obj_id); + { + auto sync_session = outer_realm->sync_session(); + REQUIRE(sync_session); + auto sub_store = sync_session->get_flx_subscription_store(); + REQUIRE(sub_store); + auto active_subs = sub_store->get_active(); + REQUIRE(active_subs.size() == 1); + REQUIRE(active_subs.find("flx_migrated_Object")); + + auto table = outer_realm->read_group().get_table("class_Object"); + REQUIRE(table->size() == 6); + + auto object_table = outer_realm->read_group().get_table("class_Object"); + auto pending_object = object_table->get_object_with_primary_key(obj_id); REQUIRE(pending_object.get("string_field") == "partition-set-during-sync-upload"); + + // Close the session and create a dummy subscription with a notification to verify it has been cancelled + outer_realm->sync_session()->pause(); } + util::Future new_subs_future = [&] { + auto sub_store = outer_realm->sync_session()->get_flx_subscription_store(); + auto mut_subs = sub_store->get_active().make_mutable_copy(); + + auto object_table = outer_realm->read_group().get_table("class_Object"); + auto string_col_key = object_table->get_column_key("string_field"); + mut_subs.insert_or_assign("dummy_subs", Query(object_table).equal(string_col_key, StringData{"some-value"})); + auto new_subs = mut_subs.commit(); + return new_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete); + }(); + // Wait for the object to be written to Atlas/MongoDB before rollback, otherwise it may be lost reset_utils::wait_for_object_to_persist_to_atlas(session.app()->current_user(), session.app_session(), "Object", {{"_id", obj_id}}); // Roll back to PBS - trigger_server_migration(session.app_session(), false, logger_ptr); + trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr); - // Open up the realm without the sync client attached and make a change. This will be recovered by the rollback. + // Add a local object while the session is paused. This will be recovered when connecting after the rollback. { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), config.path, options); - - auto tr = realm->start_write(); - tr->get_table("class_Object") + outer_realm->begin_transaction(); + outer_realm->read_group() + .get_table("class_Object") ->create_object_with_primary_key(ObjectId::gen()) .set("string_field", "partition-set-by-pbs"); - tr->commit(); + outer_realm->commit_transaction(); } // Connect after rolling back to PBS + outer_realm->sync_session()->resume(); + REQUIRE(!wait_for_upload(*outer_realm)); + REQUIRE(!wait_for_download(*outer_realm)); + + { + auto table = outer_realm->read_group().get_table("class_Object"); + REQUIRE(table->size() == 7); + + // Verify the internal sync session subscription store has been cleared + auto sync_session = outer_realm->sync_session(); + REQUIRE(sync_session); + auto sub_store = SyncSession::OnlyForTesting::get_subscription_store_base(*sync_session); + REQUIRE(sub_store); + auto active_subs = sub_store->get_latest(); + REQUIRE(active_subs.size() == 0); + REQUIRE(active_subs.version() == 0); + + auto result = wait_for_future(std::move(new_subs_future)).get_no_throw(); + REALM_ASSERT(result.is_ok()); + REALM_ASSERT(result.get_value() == sync::SubscriptionSet::State::Superseded); + } + + outer_realm.reset(); + + // Migrate back to FLX + trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr); + + // Verify data has been sync'ed and there is only 1 subscription for the Object table { auto realm = Realm::get_shared_realm(config); @@ -381,7 +433,14 @@ TEST_CASE("Test client migration and rollback with recovery", "[flx][migration]" REQUIRE(!wait_for_download(*realm)); auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 7); + REQUIRE(table->size() == 7); + auto sync_session = realm->sync_session(); + REQUIRE(sync_session); + auto sub_store = sync_session->get_flx_subscription_store(); + REQUIRE(sub_store); + auto active_subs = sub_store->get_active(); + REQUIRE(active_subs.size() == 1); + REQUIRE(active_subs.find("flx_migrated_Object")); } } @@ -416,7 +475,7 @@ TEST_CASE("An interrupted migration can recover on the next session", "[flx][mig } // Migrate to FLX - trigger_server_migration(session.app_session(), true, logger_ptr); + trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr); // Session is interrupted before the migration is completed. { @@ -476,7 +535,7 @@ TEST_CASE("Update to native FLX after migration", "[flx][migration]") { } // Migrate to FLX - trigger_server_migration(session.app_session(), true, logger_ptr); + trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr); { auto realm = Realm::get_shared_realm(config); diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp index 2bbbfc2bc0e..a3501541e47 100644 --- a/test/test_sync_subscriptions.cpp +++ b/test/test_sync_subscriptions.cpp @@ -533,6 +533,147 @@ TEST(Sync_SubscriptionStoreSubSetHasTable) CHECK(table_set.empty()); } +TEST(Sync_SubscriptionStoreNotifyAll) +{ + SHARED_GROUP_TEST_PATH(sub_store_path) + SubscriptionStoreFixture fixture(sub_store_path); + auto store = SubscriptionStore::create(fixture.db, [](int64_t) {}); + + const Status status_abort(ErrorCodes::OperationAborted, "operation aborted"); + + size_t hit_count = 0; + + auto state_handler = [this, &hit_count, &status_abort](StatusWith state) { + CHECK(!state.is_ok()); + CHECK_EQUAL(state, status_abort); + hit_count++; + }; + + auto read_tr = fixture.db->start_read(); + // We should have no subscriptions yet so this should return false. + auto table_set = store->get_tables_for_latest(*read_tr); + CHECK(table_set.empty()); + + Query query_a(read_tr->get_table(fixture.a_table_key)); + query_a.equal(fixture.foo_col, StringData("JBR")).greater_equal(fixture.bar_col, int64_t(1)); + Query query_b(read_tr->get_table(fixture.a_table_key)); + query_b.equal(fixture.foo_col, "Realm"); + + // Create multiple pending subscriptions and notify all of them + { + auto mut_sub_set1 = store->get_latest().make_mutable_copy(); + mut_sub_set1.insert_or_assign(query_a); + auto sub_set1 = mut_sub_set1.commit(); + + sub_set1.get_state_change_notification(SubscriptionSet::State::Complete) + .get_async([&state_handler](StatusWith state) { + state_handler(state); + }); + } + { + auto mut_sub_set2 = store->get_latest().make_mutable_copy(); + mut_sub_set2.insert_or_assign(query_b); + auto sub_set2 = mut_sub_set2.commit(); + + sub_set2.get_state_change_notification(SubscriptionSet::State::Complete) + .get_async([&state_handler](StatusWith state) { + state_handler(state); + }); + } + { + auto mut_sub_set3 = store->get_latest().make_mutable_copy(); + mut_sub_set3.insert_or_assign(query_a); + auto sub_set3 = mut_sub_set3.commit(); + + sub_set3.get_state_change_notification(SubscriptionSet::State::Complete) + .get_async([&state_handler](StatusWith state) { + state_handler(state); + }); + } + + auto pending_subs = store->get_pending_subscriptions(); + CHECK_EQUAL(pending_subs.size(), 3); + for (auto& sub : pending_subs) { + CHECK_EQUAL(sub.state(), SubscriptionSet::State::Pending); + } + + store->notify_all_state_change_notifications(status_abort); + CHECK_EQUAL(hit_count, 3); + + // Any pending subscriptions should still be in the pending state after notify() + pending_subs = store->get_pending_subscriptions(); + CHECK_EQUAL(pending_subs.size(), 3); + for (auto& sub : pending_subs) { + CHECK_EQUAL(sub.state(), SubscriptionSet::State::Pending); + } +} + +TEST(Sync_SubscriptionStoreTerminate) +{ + SHARED_GROUP_TEST_PATH(sub_store_path) + SubscriptionStoreFixture fixture(sub_store_path); + auto store = SubscriptionStore::create(fixture.db, [](int64_t) {}); + + size_t hit_count = 0; + + auto state_handler = [this, &hit_count](StatusWith state) { + CHECK(state.is_ok()); + CHECK_EQUAL(state, SubscriptionSet::State::Superseded); + hit_count++; + }; + + auto read_tr = fixture.db->start_read(); + // We should have no subscriptions yet so this should return false. + auto table_set = store->get_tables_for_latest(*read_tr); + CHECK(table_set.empty()); + + Query query_a(read_tr->get_table(fixture.a_table_key)); + query_a.equal(fixture.foo_col, StringData("JBR")).greater_equal(fixture.bar_col, int64_t(1)); + Query query_b(read_tr->get_table(fixture.a_table_key)); + query_b.equal(fixture.foo_col, "Realm"); + + // Create multiple pending subscriptions and "terminate" all of them + { + auto mut_sub_set1 = store->get_latest().make_mutable_copy(); + mut_sub_set1.insert_or_assign(query_a); + auto sub_set1 = mut_sub_set1.commit(); + + sub_set1.get_state_change_notification(SubscriptionSet::State::Complete) + .get_async([&state_handler](StatusWith state) { + state_handler(state); + }); + } + { + auto mut_sub_set2 = store->get_latest().make_mutable_copy(); + mut_sub_set2.insert_or_assign(query_b); + auto sub_set2 = mut_sub_set2.commit(); + + sub_set2.get_state_change_notification(SubscriptionSet::State::Complete) + .get_async([&state_handler](StatusWith state) { + state_handler(state); + }); + } + { + auto mut_sub_set3 = store->get_latest().make_mutable_copy(); + mut_sub_set3.insert_or_assign(query_a); + auto sub_set3 = mut_sub_set3.commit(); + + sub_set3.get_state_change_notification(SubscriptionSet::State::Complete) + .get_async([&state_handler](StatusWith state) { + state_handler(state); + }); + } + + CHECK_EQUAL(store->get_latest().version(), 3); + CHECK_EQUAL(store->get_pending_subscriptions().size(), 3); + + store->terminate(); // notifications are called on this thread + + CHECK_EQUAL(hit_count, 3); + CHECK_EQUAL(store->get_latest().version(), 0); + CHECK_EQUAL(store->get_pending_subscriptions().size(), 0); +} + // Copied from sync_metadata_schema.cpp constexpr static std::string_view c_flx_metadata_table("flx_metadata"); constexpr static std::string_view c_meta_schema_version_field("schema_version");