From f18c87a92280df6cd582a71a97bf6462926973d1 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 30 Aug 2022 17:27:26 -0400
Subject: [PATCH 01/21] Add AwaitingMark state for SubscriptionSets

---
 src/realm/sync/client.cpp                  | 30 +++++++++-
 src/realm/sync/noinst/client_impl_base.cpp |  2 +-
 src/realm/sync/subscriptions.cpp           | 70 +++++++++++-----------
 src/realm/sync/subscriptions.hpp           | 29 ++++++---
 test/object-store/sync/flx_sync.cpp        | 10 +++-
 test/test_sync_subscriptions.cpp           |  2 +-
 6 files changed, 93 insertions(+), 50 deletions(-)

diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp
index 2e369738bf7..20bdc02af41 100644
--- a/src/realm/sync/client.cpp
+++ b/src/realm/sync/client.cpp
@@ -254,6 +254,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
     int64_t m_flx_active_version = 0;
     int64_t m_flx_last_seen_version = 0;
     int64_t m_flx_latest_version = 0;
+    int64_t m_flx_pending_mark_version = 0;
     std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
 
     bool m_initiated = false;
@@ -955,8 +956,10 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
     REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
 
     if (m_flx_subscription_store) {
-        std::tie(m_flx_active_version, m_flx_latest_version) =
-            m_flx_subscription_store->get_active_and_latest_versions();
+        auto versions_info = m_flx_subscription_store->get_active_and_latest_versions();
+        m_flx_active_version = versions_info.active;
+        m_flx_latest_version = versions_info.latest;
+        m_flx_pending_mark_version = versions_info.pending_mark;
     }
 }
 
@@ -1037,7 +1040,13 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
                 return;
             }
             on_flx_sync_version_complete(new_version);
-            new_state = SubscriptionSet::State::Complete;
+            if (new_version == 0) {
+                new_state = SubscriptionSet::State::Complete;
+            }
+            else {
+                new_state = SubscriptionSet::State::AwaitingMark;
+                m_flx_pending_mark_version = new_version;
+            }
             break;
         case DownloadBatchState::MoreToCome:
             if (m_flx_last_seen_version == new_version) {
@@ -1443,6 +1452,16 @@ void SessionWrapper::on_download_completion()
         m_upload_completion_handlers.push_back(std::move(handler)); // Throws
         m_sync_completion_handlers.pop_back();
     }
+
+    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
+        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
+                             m_flx_pending_mark_version);
+        auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
+        mutable_subs.update_state(SubscriptionSet::State::Complete);
+        std::move(mutable_subs).commit();
+        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
+    }
+
     util::LockGuard lock{m_client.m_mutex};
     if (m_staged_download_mark > m_reached_download_mark) {
         m_reached_download_mark = m_staged_download_mark;
@@ -1463,6 +1482,11 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
 void SessionWrapper::on_resumed()
 {
     m_suspended = false;
+    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
+        m_sess->logger.debug("Requesting download notification for query version %1 after resume",
+                             m_flx_pending_mark_version);
+        m_sess->request_download_completion_notification();
+    }
     if (m_connection_state_change_listener) {
         ClientImpl::Connection& conn = m_sess->get_connection();
         if (conn.get_state() != ConnectionState::disconnected) {
diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp
index f3c5e5a70fc..3abce012a15 100644
--- a/src/realm/sync/noinst/client_impl_base.cpp
+++ b/src/realm/sync/noinst/client_impl_base.cpp
@@ -1778,7 +1778,7 @@ void Session::send_query_change_message()
 
     m_last_sent_flx_query_version = latest_sub_set.version();
 
-    enlist_to_send(); // throws
+    request_download_completion_notification();
 }
 
 void Session::send_upload_message()
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 6e6bcbf67e2..3e3db02f1ef 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -313,6 +313,9 @@ void MutableSubscriptionSet::update_state(State new_state, util::Optional<std::s
 {
     check_is_mutable();
     auto old_state = state();
+    if (error_str && new_state != State::Error) {
+        throw std::logic_error("Cannot supply an error message for a subscription set when state is not Error");
+    }
     switch (new_state) {
         case State::Uncommitted:
             throw std::logic_error("cannot set subscription set state to uncommitted");
@@ -330,17 +333,12 @@ void MutableSubscriptionSet::update_state(State new_state, util::Optional<std::s
             m_error_str = std::string{*error_str};
             break;
         case State::Bootstrapping:
-            if (error_str) {
-                throw std::logic_error(
-                    "Cannot supply an error message for a subscription set when state is not Error");
-            }
+            m_state = new_state;
+            break;
+        case State::AwaitingMark:
             m_state = new_state;
             break;
         case State::Complete: {
-            if (error_str) {
-                throw std::logic_error(
-                    "Cannot supply an error message for a subscription set when state is not Error");
-            }
             auto mgr = get_flx_subscription_store(); // Throws
             m_state = new_state;
             mgr->supercede_prior_to(m_tr, version());
@@ -624,11 +622,11 @@ SubscriptionSet SubscriptionStore::get_latest() const
 {
     auto tr = m_db->start_frozen();
     auto sub_sets = tr->get_table(m_sub_set_table);
-    if (sub_sets->is_empty()) {
-        return SubscriptionSet(weak_from_this(), *tr, Obj{});
-    }
-    auto latest_id = *sub_sets->max(sub_sets->get_primary_key_column());
-    auto latest_obj = sub_sets->get_object_with_primary_key(latest_id);
+    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
+    REALM_ASSERT(!sub_sets->is_empty());
+
+    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
+    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
 
     return SubscriptionSet(weak_from_this(), *tr, latest_obj);
 }
@@ -637,15 +635,16 @@ SubscriptionSet SubscriptionStore::get_active() const
 {
     auto tr = m_db->start_frozen();
     auto sub_sets = tr->get_table(m_sub_set_table);
-    if (sub_sets->is_empty()) {
-        return SubscriptionSet(weak_from_this(), *tr, Obj{});
-    }
+    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
+    REALM_ASSERT(!sub_sets->is_empty());
 
     DescriptorOrdering descriptor_ordering;
     descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
     descriptor_ordering.append_limit(LimitDescriptor{1});
     auto res = sub_sets->where()
                    .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Complete))
+                   .Or()
+                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::AwaitingMark))
                    .find_all(descriptor_ordering);
 
     if (res.is_empty()) {
@@ -654,28 +653,32 @@ SubscriptionSet SubscriptionStore::get_active() const
     return SubscriptionSet(weak_from_this(), *tr, res.get_object(0));
 }
 
-std::pair<int64_t, int64_t> SubscriptionStore::get_active_and_latest_versions() const
+SubscriptionStore::VersionInfo SubscriptionStore::get_active_and_latest_versions() const
 {
     auto tr = m_db->start_read();
     auto sub_sets = tr->get_table(m_sub_set_table);
-    if (sub_sets->is_empty()) {
-        return {0, 0};
-    }
+    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
+    REALM_ASSERT(!sub_sets->is_empty());
 
-    auto latest_id = *sub_sets->max(sub_sets->get_primary_key_column());
+    VersionInfo ret;
+    ret.latest = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
     DescriptorOrdering descriptor_ordering;
     descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
     descriptor_ordering.append_limit(LimitDescriptor{1});
+
     auto res = sub_sets->where()
                    .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Complete))
+                   .Or()
+                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::AwaitingMark))
                    .find_all(descriptor_ordering);
+    ret.active = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
 
-    if (res.is_empty()) {
-        return {-1, latest_id.get_int()};
-    }
+    res = sub_sets->where()
+              .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::AwaitingMark))
+              .find_all(descriptor_ordering);
+    ret.pending_mark = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
 
-    auto active_id = res.get_object(0).get_primary_key();
-    return {active_id.get_int(), latest_id.get_int()};
+    return ret;
 }
 
 util::Optional<SubscriptionStore::PendingSubscription>
@@ -683,9 +686,8 @@ SubscriptionStore::get_next_pending_version(int64_t last_query_version, DB::vers
 {
     auto tr = m_db->start_read();
     auto sub_sets = tr->get_table(m_sub_set_table);
-    if (sub_sets->is_empty()) {
-        return util::none;
-    }
+    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
+    REALM_ASSERT(!sub_sets->is_empty());
 
     DescriptorOrdering descriptor_ordering;
     descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}});
@@ -761,11 +763,11 @@ SubscriptionSet SubscriptionStore::get_by_version_impl(int64_t version_id,
 SubscriptionStore::TableSet SubscriptionStore::get_tables_for_latest(const Transaction& tr) const
 {
     auto sub_sets = tr.get_table(m_sub_set_table);
-    if (sub_sets->is_empty()) {
-        return {};
-    }
-    auto latest_id = *sub_sets->max(sub_sets->get_primary_key_column());
-    auto latest_obj = sub_sets->get_object_with_primary_key(latest_id);
+    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
+    REALM_ASSERT(!sub_sets->is_empty());
+
+    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
+    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
 
     TableSet ret;
     auto subs = latest_obj.get_linklist(m_sub_set_subscriptions);
diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp
index 1515bf93346..39502257918 100644
--- a/src/realm/sync/subscriptions.hpp
+++ b/src/realm/sync/subscriptions.hpp
@@ -90,13 +90,13 @@ class SubscriptionSet {
     /*
      * State diagram:
      *
-     *                    ┌───────────┬─────────►Error─────────┐
-     *                    │           │                        │
-     *                    │           │                        ▼
-     *   Uncommitted──►Pending──►Bootstrapping──►Complete───►Superseded
-     *                    │                                    ▲
-     *                    │                                    │
-     *                    └────────────────────────────────────┘
+     *                    ┌───────────┬─────────►Error─────────────┐
+     *                    │           │                            │
+     *                    │           │                            ▼
+     *   Uncommitted──►Pending──►Bootstrapping──►AwaitingMark──►Complete───►Superseded
+     *                    │                                        ▲
+     *                    │                                        │
+     *                    └────────────────────────────────────────┘
      *
      */
     enum class State : int64_t {
@@ -107,6 +107,9 @@ class SubscriptionSet {
         Pending,
         // The server is currently sending the initial state that represents this subscription set to the client.
         Bootstrapping,
+        // The last bootstrap message containing the initial state for this subscription set has been received. The
+        // client is awaiting a mark message to mark this subscription as fully caught up to history.
+        AwaitingMark,
         // This subscription set is the active subscription set that is currently being synchronized with the server.
         Complete,
         // An error occurred while processing this subscription set on the server. Check error_str() for details.
@@ -116,6 +119,8 @@ class SubscriptionSet {
         Superseded,
     };
 
+    static constexpr int64_t EmptyVersion = int64_t(-1);
+
     // Used in tests.
     inline friend std::ostream& operator<<(std::ostream& o, State state)
     {
@@ -129,6 +134,9 @@ class SubscriptionSet {
             case State::Bootstrapping:
                 o << "Bootstrapping";
                 break;
+            case State::AwaitingMark:
+                o << "AwaitingMark";
+                break;
             case State::Complete:
                 o << "Complete";
                 break;
@@ -312,9 +320,14 @@ class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore>
     // zero.
     SubscriptionSet get_active() const;
 
+    struct VersionInfo {
+        int64_t latest;
+        int64_t active;
+        int64_t pending_mark;
+    };
     // Returns the version number of the current active and latest subscription sets. This function guarantees
     // that the versions will be read from the same underlying transaction and will thus be consistent.
-    std::pair<int64_t, int64_t> get_active_and_latest_versions() const;
+    VersionInfo get_active_and_latest_versions() const;
 
     // To be used internally by the sync client. This returns a mutable view of a subscription set by its
     // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound.
diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp
index f39b6109197..223b19b99d0 100644
--- a/test/object-store/sync/flx_sync.cpp
+++ b/test/object-store/sync/flx_sync.cpp
@@ -1099,7 +1099,9 @@ TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][f
     {
         auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path);
         auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
-        REQUIRE(sub_store->get_active_and_latest_versions() == std::pair<int64_t, int64_t>{0, 1});
+        auto version_info = sub_store->get_active_and_latest_versions();
+        REQUIRE(version_info.active == 0);
+        REQUIRE(version_info.latest == 1);
         auto latest_subs = sub_store->get_latest();
         REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
         REQUIRE(latest_subs.size() == 1);
@@ -1500,7 +1502,9 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
         REQUIRE(top_level->is_empty());
 
         auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
-        REQUIRE(sub_store->get_active_and_latest_versions() == std::pair<int64_t, int64_t>{0, 1});
+        auto version_info = sub_store->get_active_and_latest_versions();
+        REQUIRE(version_info.latest == 1);
+        REQUIRE(version_info.active == 0);
         auto latest_subs = sub_store->get_latest();
         REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
         REQUIRE(latest_subs.size() == 1);
@@ -1735,7 +1739,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
                 auto latest_sub_set = session->get_flx_subscription_store()->get_latest();
                 auto active_sub_set = session->get_flx_subscription_store()->get_active();
                 REQUIRE(latest_sub_set.version() == active_sub_set.version());
-                REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::Complete);
+                REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
 
                 auto db = SyncSession::OnlyForTesting::get_db(*session);
                 auto tr = db->start_read();
diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp
index 13c016f3fe1..0297d9ebd8f 100644
--- a/test/test_sync_subscriptions.cpp
+++ b/test/test_sync_subscriptions.cpp
@@ -432,7 +432,7 @@ TEST(Sync_SubscriptionStoreInternalSchemaMigration)
     util::File::copy(path.string(), sub_store_path);
     SubscriptionStoreFixture fixture(sub_store_path);
     auto store = SubscriptionStore::create(fixture.db, [](int64_t) {});
-    auto [active_version, latest_version] = store->get_active_and_latest_versions();
+    auto [active_version, latest_version, pending_mark_version] = store->get_active_and_latest_versions();
     CHECK_EQUAL(active_version, latest_version);
     auto active = store->get_active();
     CHECK_EQUAL(active.version(), 1);

From 9b8768d1e87fe64f21c61a5ce2a4bf4fc6b9422b Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 30 Aug 2022 17:33:40 -0400
Subject: [PATCH 02/21] undo config override

---
 evergreen/config_overrides.json | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/evergreen/config_overrides.json b/evergreen/config_overrides.json
index 40d92ff2948..5246f455ebf 100644
--- a/evergreen/config_overrides.json
+++ b/evergreen/config_overrides.json
@@ -11,7 +11,7 @@
         }
     },
     "sync": {
-        "brokerRegistryCacheTTL": "0s",
         "allowSyncSessionTestCommands": true
     }
+  }
 }

From 7a537ebb1cc51cdac3dcbb62b96b21f7e6e23f22 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 30 Aug 2022 18:16:15 -0400
Subject: [PATCH 03/21] do not break SubscriptionSet schema

---
 src/realm/sync/subscriptions.hpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp
index 39502257918..8dec51f4f2e 100644
--- a/src/realm/sync/subscriptions.hpp
+++ b/src/realm/sync/subscriptions.hpp
@@ -107,9 +107,6 @@ class SubscriptionSet {
         Pending,
         // The server is currently sending the initial state that represents this subscription set to the client.
         Bootstrapping,
-        // The last bootstrap message containing the initial state for this subscription set has been received. The
-        // client is awaiting a mark message to mark this subscription as fully caught up to history.
-        AwaitingMark,
         // This subscription set is the active subscription set that is currently being synchronized with the server.
         Complete,
         // An error occurred while processing this subscription set on the server. Check error_str() for details.
@@ -117,6 +114,9 @@ class SubscriptionSet {
         // The server responded to a later subscription set to this one and this one has been trimmed from the
         // local storage of subscription sets.
         Superseded,
+        // The last bootstrap message containing the initial state for this subscription set has been received. The
+        // client is awaiting a mark message to mark this subscription as fully caught up to history.
+        AwaitingMark,
     };
 
     static constexpr int64_t EmptyVersion = int64_t(-1);

From 39a518193dc324213c83fa31b1ead98a19424cac Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 29 Sep 2022 17:42:26 -0400
Subject: [PATCH 04/21] add test and fix up enum translation

---
 src/realm.h                           |  3 +-
 src/realm/object-store/c_api/sync.cpp | 50 +++++++++++++++--
 src/realm/sync/subscriptions.cpp      | 78 +++++++++++++++++++++++++--
 src/realm/sync/subscriptions.hpp      |  8 +--
 test/object-store/sync/flx_sync.cpp   | 60 +++++++++++++++++++++
 5 files changed, 186 insertions(+), 13 deletions(-)

diff --git a/src/realm.h b/src/realm.h
index 222a6416782..90665f5944c 100644
--- a/src/realm.h
+++ b/src/realm.h
@@ -3542,10 +3542,11 @@ typedef struct realm_flx_sync_subscription_desc realm_flx_sync_subscription_desc
 typedef enum realm_flx_sync_subscription_set_state {
     RLM_SYNC_SUBSCRIPTION_UNCOMMITTED = 0,
     RLM_SYNC_SUBSCRIPTION_PENDING,
-    RLM_SYNC_BOOTSTRAPPING,
+    RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING,
     RLM_SYNC_SUBSCRIPTION_COMPLETE,
     RLM_SYNC_SUBSCRIPTION_ERROR,
     RLM_SYNC_SUBSCRIPTION_SUPERSEDED,
+    RLM_SYNC_SUBSCRIPTION_AWAITING_MARK,
 } realm_flx_sync_subscription_set_state_e;
 typedef void (*realm_sync_on_subscription_state_changed_t)(realm_userdata_t userdata,
                                                            realm_flx_sync_subscription_set_state_e state);
diff --git a/src/realm/object-store/c_api/sync.cpp b/src/realm/object-store/c_api/sync.cpp
index 1a4e096aa97..f1790b7697d 100644
--- a/src/realm/object-store/c_api/sync.cpp
+++ b/src/realm/object-store/c_api/sync.cpp
@@ -212,6 +212,47 @@ static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientReset)
               RLM_SYNC_ERROR_ACTION_CLIENT_RESET);
 static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientResetNoRecovery) ==
               RLM_SYNC_ERROR_ACTION_CLIENT_RESET_NO_RECOVERY);
+
+SubscriptionSet::State sub_state_from_c_enum(realm_flx_sync_subscription_set_state_e value)
+{
+    switch (static_cast<realm_flx_sync_subscription_set_state>(value)) {
+        case RLM_SYNC_SUBSCRIPTION_PENDING:
+            return SubscriptionSet::State::Pending;
+        case RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING:
+            return SubscriptionSet::State::Bootstrapping;
+        case RLM_SYNC_SUBSCRIPTION_AWAITING_MARK:
+            return SubscriptionSet::State::AwaitingMark;
+        case RLM_SYNC_SUBSCRIPTION_COMPLETE:
+            return SubscriptionSet::State::Complete;
+        case RLM_SYNC_SUBSCRIPTION_ERROR:
+            return SubscriptionSet::State::Error;
+        case RLM_SYNC_SUBSCRIPTION_SUPERSEDED:
+            return SubscriptionSet::State::Superseded;
+        case RLM_SYNC_SUBSCRIPTION_UNCOMMITTED:
+            return SubscriptionSet::State::Uncommitted;
+    }
+}
+
+realm_flx_sync_subscription_set_state_e sub_state_to_c_enum(SubscriptionSet::State state)
+{
+    switch (state) {
+        case SubscriptionSet::State::Pending:
+            return RLM_SYNC_SUBSCRIPTION_PENDING;
+        case SubscriptionSet::State::Bootstrapping:
+            return RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING;
+        case SubscriptionSet::State::AwaitingMark:
+            return RLM_SYNC_SUBSCRIPTION_AWAITING_MARK;
+        case SubscriptionSet::State::Complete:
+            return RLM_SYNC_SUBSCRIPTION_COMPLETE;
+        case SubscriptionSet::State::Error:
+            return RLM_SYNC_SUBSCRIPTION_ERROR;
+        case SubscriptionSet::State::Uncommitted:
+            return RLM_SYNC_SUBSCRIPTION_UNCOMMITTED;
+        case SubscriptionSet::State::Superseded:
+            return RLM_SYNC_SUBSCRIPTION_SUPERSEDED;
+    }
+}
+
 } // namespace
 
 static realm_sync_error_code_t to_capi(const std::error_code& error_code, std::string& message)
@@ -569,8 +610,8 @@ realm_sync_on_subscription_set_state_change_wait(const realm_flx_sync_subscripti
 {
     REALM_ASSERT(subscription_set != nullptr);
     SubscriptionSet::State state =
-        subscription_set->get_state_change_notification(SubscriptionSet::State{notify_when}).get();
-    return realm_flx_sync_subscription_set_state_e(static_cast<int>(state));
+        subscription_set->get_state_change_notification(sub_state_from_c_enum(notify_when)).get();
+    return sub_state_to_c_enum(state);
 }
 
 RLM_API bool
@@ -581,13 +622,12 @@ realm_sync_on_subscription_set_state_change_async(const realm_flx_sync_subscript
 {
     REALM_ASSERT(subscription_set != nullptr && callback != nullptr);
     return wrap_err([&]() {
-        auto future_state = subscription_set->get_state_change_notification(SubscriptionSet::State{notify_when});
+        auto future_state = subscription_set->get_state_change_notification(sub_state_from_c_enum(notify_when));
         std::move(future_state)
             .get_async([callback, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](
                            const StatusWith<SubscriptionSet::State>& state) -> void {
                 if (state.is_ok())
-                    callback(userdata.get(),
-                             realm_flx_sync_subscription_set_state_e(static_cast<int>(state.get_value())));
+                    callback(userdata.get(), sub_state_to_c_enum(state.get_value()));
                 else
                     callback(userdata.get(), realm_flx_sync_subscription_set_state_e::RLM_SYNC_SUBSCRIPTION_ERROR);
             });
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 3e3db02f1ef..b9098dd307c 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -57,6 +57,63 @@ constexpr static std::string_view c_flx_sub_query_str_field("query");
 
 using OptionalString = util::Optional<std::string>;
 
+enum class SubscriptionStateForStorage : int64_t {
+    // This subscription set has not been persisted and has not been sent to the server. This state is only valid
+    // for MutableSubscriptionSets
+    Uncommitted = 0,
+    // The subscription set has been persisted locally but has not been acknowledged by the server yet.
+    Pending = 1,
+    // The server is currently sending the initial state that represents this subscription set to the client.
+    Bootstrapping = 2,
+    // This subscription set is the active subscription set that is currently being synchronized with the server.
+    Complete = 3,
+    // An error occurred while processing this subscription set on the server. Check error_str() for details.
+    Error = 4,
+    // The last bootstrap message containing the initial state for this subscription set has been received. The
+    // client is awaiting a mark message to mark this subscription as fully caught up to history.
+    AwaitingMark = 6,
+};
+
+SubscriptionSet::State state_from_storage(int64_t value)
+{
+    switch (static_cast<SubscriptionStateForStorage>(value)) {
+        case SubscriptionStateForStorage::Uncommitted:
+            return SubscriptionSet::State::Uncommitted;
+        case SubscriptionStateForStorage::Pending:
+            return SubscriptionSet::State::Pending;
+        case SubscriptionStateForStorage::Bootstrapping:
+            return SubscriptionSet::State::Bootstrapping;
+        case SubscriptionStateForStorage::AwaitingMark:
+            return SubscriptionSet::State::AwaitingMark;
+        case SubscriptionStateForStorage::Complete:
+            return SubscriptionSet::State::Complete;
+        case SubscriptionStateForStorage::Error:
+            return SubscriptionSet::State::Error;
+        default:
+            REALM_UNREACHABLE();
+    }
+}
+
+int64_t state_to_storage(SubscriptionSet::State state)
+{
+    switch (state) {
+        case SubscriptionSet::State::Uncommitted:
+            return static_cast<int64_t>(SubscriptionStateForStorage::Uncommitted);
+        case SubscriptionSet::State::Pending:
+            return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
+        case SubscriptionSet::State::Bootstrapping:
+            return static_cast<int64_t>(SubscriptionStateForStorage::Bootstrapping);
+        case SubscriptionSet::State::AwaitingMark:
+            return static_cast<int64_t>(SubscriptionStateForStorage::AwaitingMark);
+        case SubscriptionSet::State::Complete:
+            return static_cast<int64_t>(SubscriptionStateForStorage::Complete);
+        case SubscriptionSet::State::Error:
+            return static_cast<int64_t>(SubscriptionStateForStorage::Error);
+        default:
+            REALM_UNREACHABLE();
+    }
+}
+
 } // namespace
 
 Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
@@ -139,7 +196,7 @@ void SubscriptionSet::load_from_database(const Transaction& tr, Obj obj)
 
     m_cur_version = tr.get_version();
     m_version = obj.get_primary_key().get_int();
-    m_state = static_cast<State>(obj.get<int64_t>(mgr->m_sub_set_state));
+    m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
     m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
     m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
     auto sub_list = obj.get_linklist(mgr->m_sub_set_subscriptions);
@@ -429,8 +486,23 @@ void MutableSubscriptionSet::process_notifications()
     mgr->m_pending_notifications_cv.wait(lk, [&] {
         return mgr->m_outstanding_requests == 0;
     });
+
+    auto cmp_states_gte = [](SubscriptionSet::State lhs, SubscriptionSet::State rhs) {
+        static std::array<int64_t, 7> orders = {
+            0, // SubscriptionSet::State::Uncommitted
+            1, // SubscriptionSet::State::Pending
+            2, // SubscriptionSet::State::Bootstrapping
+            4, // SubscriptionSet::State::Complete
+            5, // SubscriptionSet::State::Error
+            6, // SubscriptionSet::State::Superseded
+            3, // SubscriptionSet::State::AwaitingMark
+        };
+        return orders.at(static_cast<size_t>(lhs)) >= orders.at(static_cast<size_t>(rhs));
+    };
+
     for (auto it = mgr->m_pending_notifications.begin(); it != mgr->m_pending_notifications.end();) {
-        if ((it->version == my_version && (new_state == State::Error || new_state >= it->notify_when)) ||
+        if ((it->version == my_version &&
+             (new_state == State::Error || cmp_states_gte(new_state, it->notify_when))) ||
             (new_state == State::Complete && it->version < my_version)) {
             to_finish.splice(to_finish.end(), mgr->m_pending_notifications, it++);
         }
@@ -486,7 +558,7 @@ SubscriptionSet MutableSubscriptionSet::commit() &&
             new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string()));
         }
     }
-    m_obj.set(mgr->m_sub_set_state, static_cast<int64_t>(m_state));
+    m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state));
     if (!m_error_str.empty()) {
         m_obj.set(mgr->m_sub_set_error_str, StringData(m_error_str));
     }
diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp
index 8dec51f4f2e..7d7dac3fcac 100644
--- a/src/realm/sync/subscriptions.hpp
+++ b/src/realm/sync/subscriptions.hpp
@@ -99,7 +99,7 @@ class SubscriptionSet {
      *                    └────────────────────────────────────────┘
      *
      */
-    enum class State : int64_t {
+    enum class State {
         // This subscription set has not been persisted and has not been sent to the server. This state is only valid
         // for MutableSubscriptionSets
         Uncommitted = 0,
@@ -107,6 +107,9 @@ class SubscriptionSet {
         Pending,
         // The server is currently sending the initial state that represents this subscription set to the client.
         Bootstrapping,
+        // The last bootstrap message containing the initial state for this subscription set has been received. The
+        // client is awaiting a mark message to mark this subscription as fully caught up to history.
+        AwaitingMark,
         // This subscription set is the active subscription set that is currently being synchronized with the server.
         Complete,
         // An error occurred while processing this subscription set on the server. Check error_str() for details.
@@ -114,9 +117,6 @@ class SubscriptionSet {
         // The server responded to a later subscription set to this one and this one has been trimmed from the
         // local storage of subscription sets.
         Superseded,
-        // The last bootstrap message containing the initial state for this subscription set has been received. The
-        // client is awaiting a mark message to mark this subscription as fully caught up to history.
-        AwaitingMark,
     };
 
     static constexpr int64_t EmptyVersion = int64_t(-1);
diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp
index 223b19b99d0..32df619f132 100644
--- a/test/object-store/sync/flx_sync.cpp
+++ b/test/object-store/sync/flx_sync.cpp
@@ -2071,6 +2071,66 @@ TEST_CASE("flx: send client error", "[sync][flx][app]") {
         wait_for_error_to_persist(harness.session().app_session(), "simulated failure (ProtocolErrorCode=112)"));
 }
 
+TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
+    FLXSyncTestHarness harness("bootstrap_full_sync");
+
+    SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
+    auto triggered_realm = Realm::get_shared_realm(triggered_config);
+
+    wait_for_upload(*triggered_realm);
+    wait_for_download(*triggered_realm);
+
+    nlohmann::json command_request = {
+        {"command", "PAUSE_ROUTER_SESSION"},
+    };
+    auto resp_body =
+        SyncSession::OnlyForTesting::send_test_command(*triggered_realm->sync_session(), command_request.dump())
+            .get();
+    REQUIRE(resp_body == "{}");
+
+    auto bar_obj_id = ObjectId::gen();
+    harness.load_initial_data([&](SharedRealm realm) {
+        CppContext c(realm);
+        Object::create(c, realm, "TopLevel",
+                       std::any(AnyDict{{"_id", bar_obj_id},
+                                        {"queryable_str_field", std::string{"bar"}},
+                                        {"queryable_int_field", static_cast<int64_t>(10)},
+                                        {"non_queryable_field", std::string{"non queryable 2"}}}));
+    });
+
+    auto setup_subs = [](SharedRealm& realm) {
+        auto table = realm->read_group().get_table("class_TopLevel");
+        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
+        new_query.clear();
+        auto col = table->get_column_key("queryable_str_field");
+        new_query.insert_or_assign(Query(table).equal(col, StringData("bar")).Or().equal(col, StringData("bizz")));
+        std::move(new_query).commit().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
+        wait_for_advance(*realm);
+    };
+
+    auto bizz_obj_id = ObjectId::gen();
+    harness.do_with_new_realm([&](SharedRealm realm) {
+        setup_subs(realm);
+        auto table = realm->read_group().get_table("class_TopLevel");
+        REQUIRE(table->find_primary_key(bar_obj_id));
+        CppContext c(realm);
+        realm->begin_transaction();
+        Object::create(c, realm, "TopLevel",
+                       std::any(AnyDict{{"_id", bizz_obj_id},
+                                        {"queryable_str_field", std::string{"bizz"}},
+                                        {"queryable_int_field", static_cast<int64_t>(15)},
+                                        {"non_queryable_field", std::string{"non queryable 3"}}}));
+        realm->commit_transaction();
+        wait_for_upload(*realm);
+    });
+
+    setup_subs(triggered_realm);
+
+    auto table = triggered_realm->read_group().get_table("class_TopLevel");
+    REQUIRE(table->find_primary_key(bar_obj_id));
+    REQUIRE(table->find_primary_key(bizz_obj_id));
+}
+
 } // namespace realm::app
 
 #endif // REALM_ENABLE_AUTH_TESTS

From 66f0e3ecab30a87efe806e011188bb6c5fa35453 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 29 Sep 2022 17:44:24 -0400
Subject: [PATCH 05/21] fix override file

---
 evergreen/config_overrides.json | 1 -
 1 file changed, 1 deletion(-)

diff --git a/evergreen/config_overrides.json b/evergreen/config_overrides.json
index 5246f455ebf..9ed747508e3 100644
--- a/evergreen/config_overrides.json
+++ b/evergreen/config_overrides.json
@@ -13,5 +13,4 @@
     "sync": {
         "allowSyncSessionTestCommands": true
     }
-  }
 }

From 244ec8b5258440ea6eada45b129d5b62f1c18f84 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 29 Sep 2022 17:49:08 -0400
Subject: [PATCH 06/21] use naive comparisons again

---
 src/realm/sync/subscriptions.cpp | 16 +---------------
 1 file changed, 1 insertion(+), 15 deletions(-)

diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index b9098dd307c..fb3fa8429bb 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -487,22 +487,8 @@ void MutableSubscriptionSet::process_notifications()
         return mgr->m_outstanding_requests == 0;
     });
 
-    auto cmp_states_gte = [](SubscriptionSet::State lhs, SubscriptionSet::State rhs) {
-        static std::array<int64_t, 7> orders = {
-            0, // SubscriptionSet::State::Uncommitted
-            1, // SubscriptionSet::State::Pending
-            2, // SubscriptionSet::State::Bootstrapping
-            4, // SubscriptionSet::State::Complete
-            5, // SubscriptionSet::State::Error
-            6, // SubscriptionSet::State::Superseded
-            3, // SubscriptionSet::State::AwaitingMark
-        };
-        return orders.at(static_cast<size_t>(lhs)) >= orders.at(static_cast<size_t>(rhs));
-    };
-
     for (auto it = mgr->m_pending_notifications.begin(); it != mgr->m_pending_notifications.end();) {
-        if ((it->version == my_version &&
-             (new_state == State::Error || cmp_states_gte(new_state, it->notify_when))) ||
+        if ((it->version == my_version && (new_state == State::Error || new_state >= it->notify_when)) ||
             (new_state == State::Complete && it->version < my_version)) {
             to_finish.splice(to_finish.end(), mgr->m_pending_notifications, it++);
         }

From 9ce02a6b36f219faf1d509bbdbd3e7de7c6c6c73 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 29 Sep 2022 18:45:10 -0400
Subject: [PATCH 07/21] use proper enum values in more places

---
 src/realm/sync/subscriptions.cpp | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index fb3fa8429bb..1558bcf7fc5 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -700,9 +700,9 @@ SubscriptionSet SubscriptionStore::get_active() const
     descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
     descriptor_ordering.append_limit(LimitDescriptor{1});
     auto res = sub_sets->where()
-                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Complete))
+                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
                    .Or()
-                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::AwaitingMark))
+                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
                    .find_all(descriptor_ordering);
 
     if (res.is_empty()) {
@@ -725,14 +725,14 @@ SubscriptionStore::VersionInfo SubscriptionStore::get_active_and_latest_versions
     descriptor_ordering.append_limit(LimitDescriptor{1});
 
     auto res = sub_sets->where()
-                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Complete))
+                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
                    .Or()
-                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::AwaitingMark))
+                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
                    .find_all(descriptor_ordering);
     ret.active = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
 
     res = sub_sets->where()
-              .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::AwaitingMark))
+              .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
               .find_all(descriptor_ordering);
     ret.pending_mark = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
 
@@ -752,9 +752,9 @@ SubscriptionStore::get_next_pending_version(int64_t last_query_version, DB::vers
     auto res = sub_sets->where()
                    .greater(sub_sets->get_primary_key_column(), last_query_version)
                    .group()
-                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending))
+                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Pending))
                    .Or()
-                   .equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Bootstrapping))
+                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Bootstrapping))
                    .end_group()
                    .greater_equal(m_sub_set_snapshot_version, static_cast<int64_t>(after_client_version))
                    .find_all(descriptor_ordering);

From 1f765a81fc7cf1dc0b506d6fbce181106f7c31d4 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Fri, 30 Sep 2022 14:47:27 -0400
Subject: [PATCH 08/21] remove workaround for client reset

---
 src/realm/object-store/sync/sync_session.cpp | 26 +++-----------------
 test/object-store/sync/flx_sync.cpp          | 24 ++++++++++++------
 2 files changed, 20 insertions(+), 30 deletions(-)

diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp
index 112ed00023c..205fe45545e 100644
--- a/src/realm/object-store/sync/sync_session.cpp
+++ b/src/realm/object-store/sync/sync_session.cpp
@@ -392,28 +392,7 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
         std::move(fresh_mut_sub)
             .commit()
             .get_state_change_notification(sync::SubscriptionSet::State::Complete)
-            .then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State) {
-                auto pf = util::make_promise_future<void>();
-                sync_session->wait_for_download_completion([=, promise = std::move(pf.promise)](
-                                                               std::error_code ec) mutable {
-                    auto strong_self = weak_self.lock();
-                    if (!strong_self) {
-                        return promise.set_error({ErrorCodes::RuntimeError,
-                                                  "SyncSession was destroyed before download could be completed"});
-                    }
-
-                    if (ec) {
-                        return promise.set_error(
-                            {ErrorCodes::RuntimeError,
-                             util::format("Error waiting for download completion for fresh realm (code: %1): %2",
-                                          ec.value(), ec.message())});
-                    }
-
-                    promise.emplace_value();
-                });
-                return std::move(pf.future);
-            })
-            .get_async([=, weak_self = weak_from_this()](Status s) {
+            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
                 // Keep the sync session alive while it's downloading, but then close
                 // it immediately
                 sync_session->close();
@@ -422,7 +401,8 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
                         strong_self->handle_fresh_realm_downloaded(db, none, server_requests_action);
                     }
                     else {
-                        strong_self->handle_fresh_realm_downloaded(nullptr, s.reason(), server_requests_action);
+                        strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status().reason(),
+                                                                   server_requests_action);
                     }
                 }
             });
diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp
index 32df619f132..8e1e71a7be9 100644
--- a/test/object-store/sync/flx_sync.cpp
+++ b/test/object-store/sync/flx_sync.cpp
@@ -2075,19 +2075,22 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
     FLXSyncTestHarness harness("bootstrap_full_sync");
 
     SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
-    auto triggered_realm = Realm::get_shared_realm(triggered_config);
+    auto problem_realm = Realm::get_shared_realm(triggered_config);
 
-    wait_for_upload(*triggered_realm);
-    wait_for_download(*triggered_realm);
+    // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
+    // on the server should have no new history entries, and then pause the router so it doesn't get any of
+    // the changes we're about to create.
+    wait_for_upload(*problem_realm);
+    wait_for_download(*problem_realm);
 
     nlohmann::json command_request = {
         {"command", "PAUSE_ROUTER_SESSION"},
     };
     auto resp_body =
-        SyncSession::OnlyForTesting::send_test_command(*triggered_realm->sync_session(), command_request.dump())
-            .get();
+        SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump()).get();
     REQUIRE(resp_body == "{}");
 
+    // Put some data into the server, this will be the data that will be in the broker cache.
     auto bar_obj_id = ObjectId::gen();
     harness.load_initial_data([&](SharedRealm realm) {
         CppContext c(realm);
@@ -2110,9 +2113,13 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
 
     auto bizz_obj_id = ObjectId::gen();
     harness.do_with_new_realm([&](SharedRealm realm) {
+        // first set a subscription to force the creation/cacheing of a broker snapshot on the server.
         setup_subs(realm);
         auto table = realm->read_group().get_table("class_TopLevel");
         REQUIRE(table->find_primary_key(bar_obj_id));
+
+        // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
+        // wait for a MARK message to come back, we'd miss it in our results.
         CppContext c(realm);
         realm->begin_transaction();
         Object::create(c, realm, "TopLevel",
@@ -2124,9 +2131,12 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
         wait_for_upload(*realm);
     });
 
-    setup_subs(triggered_realm);
+    // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
+    // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
+    // processed.
+    setup_subs(problem_realm);
 
-    auto table = triggered_realm->read_group().get_table("class_TopLevel");
+    auto table = problem_realm->read_group().get_table("class_TopLevel");
     REQUIRE(table->find_primary_key(bar_obj_id));
     REQUIRE(table->find_primary_key(bizz_obj_id));
 }

From eea8257d16949c997177c56f10ea78542977459f Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Fri, 7 Oct 2022 16:58:50 -0400
Subject: [PATCH 09/21] more tests

---
 src/realm/sync/client.cpp           |  10 +-
 src/realm/sync/config.hpp           |   1 +
 test/object-store/sync/flx_sync.cpp | 200 ++++++++++++++++++++--------
 3 files changed, 155 insertions(+), 56 deletions(-)

diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp
index 10d926cb9b3..316dd0f56b8 100644
--- a/src/realm/sync/client.cpp
+++ b/src/realm/sync/client.cpp
@@ -786,6 +786,7 @@ void SessionImpl::process_pending_flx_bootstrap()
     VersionInfo new_version;
     SyncProgress progress;
     int64_t query_version = -1;
+    size_t changesets_processed = 0;
     while (bootstrap_store->has_pending()) {
         auto pending_batch = bootstrap_store->peek_pending(batch_size_in_bytes);
         if (!pending_batch.progress) {
@@ -804,7 +805,6 @@ void SessionImpl::process_pending_flx_bootstrap()
             throw IntegrationException(ClientError::bad_changeset, "simulated failure");
         }
 
-
         history.integrate_server_changesets(
             *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
             [&](const TransactionRef& tr, size_t count) {
@@ -813,9 +813,7 @@ void SessionImpl::process_pending_flx_bootstrap()
             },
             get_transact_reporter());
         progress = *pending_batch.progress;
-
-        REALM_ASSERT(call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
-                                     batch_state, pending_batch.changesets.size()) == SyncClientHookAction::NoAction);
+        changesets_processed += pending_batch.changesets.size();
 
         logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
                     "%3. %4 changesets remaining in bootstrap",
@@ -827,6 +825,10 @@ void SessionImpl::process_pending_flx_bootstrap()
     REALM_ASSERT_3(query_version, !=, -1);
     m_wrapper.on_sync_progress();
     on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
+
+    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
+                                  DownloadBatchState::LastInBatch, changesets_processed);
+    REALM_ASSERT(action == SyncClientHookAction::NoAction);
 }
 
 void SessionImpl::on_new_flx_subscription_set(int64_t new_version)
diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp
index e7b32a69197..ee9b404c891 100644
--- a/src/realm/sync/config.hpp
+++ b/src/realm/sync/config.hpp
@@ -134,6 +134,7 @@ enum class SyncClientHookEvent {
     DownloadMessageReceived,
     DownloadMessageIntegrated,
     BootstrapMessageProcessed,
+    BootstrapProcessed,
 };
 
 enum class SyncClientHookAction {
diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp
index 8640509d1df..2380374dec6 100644
--- a/test/object-store/sync/flx_sync.cpp
+++ b/test/object-store/sync/flx_sync.cpp
@@ -2092,71 +2092,167 @@ TEST_CASE("flx: send client error", "[sync][flx][app]") {
 TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
     FLXSyncTestHarness harness("bootstrap_full_sync");
 
-    SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
-    auto problem_realm = Realm::get_shared_realm(triggered_config);
-
-    // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
-    // on the server should have no new history entries, and then pause the router so it doesn't get any of
-    // the changes we're about to create.
-    wait_for_upload(*problem_realm);
-    wait_for_download(*problem_realm);
-
-    nlohmann::json command_request = {
-        {"command", "PAUSE_ROUTER_SESSION"},
-    };
-    auto resp_body =
-        SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump()).get();
-    REQUIRE(resp_body == "{}");
-
-    // Put some data into the server, this will be the data that will be in the broker cache.
-    auto bar_obj_id = ObjectId::gen();
-    harness.load_initial_data([&](SharedRealm realm) {
-        CppContext c(realm);
-        Object::create(c, realm, "TopLevel",
-                       std::any(AnyDict{{"_id", bar_obj_id},
-                                        {"queryable_str_field", std::string{"bar"}},
-                                        {"queryable_int_field", static_cast<int64_t>(10)},
-                                        {"non_queryable_field", std::string{"non queryable 2"}}}));
-    });
-
     auto setup_subs = [](SharedRealm& realm) {
         auto table = realm->read_group().get_table("class_TopLevel");
         auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
         new_query.clear();
         auto col = table->get_column_key("queryable_str_field");
         new_query.insert_or_assign(Query(table).equal(col, StringData("bar")).Or().equal(col, StringData("bizz")));
-        std::move(new_query).commit().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
-        wait_for_advance(*realm);
+        return std::move(new_query).commit();
     };
 
+    auto bar_obj_id = ObjectId::gen();
     auto bizz_obj_id = ObjectId::gen();
-    harness.do_with_new_realm([&](SharedRealm realm) {
-        // first set a subscription to force the creation/cacheing of a broker snapshot on the server.
-        setup_subs(realm);
-        auto table = realm->read_group().get_table("class_TopLevel");
+    auto setup_and_poison_cache = [&] {
+        harness.load_initial_data([&](SharedRealm realm) {
+            CppContext c(realm);
+            Object::create(c, realm, "TopLevel",
+                           std::any(AnyDict{{"_id", bar_obj_id},
+                                            {"queryable_str_field", std::string{"bar"}},
+                                            {"queryable_int_field", static_cast<int64_t>(10)},
+                                            {"non_queryable_field", std::string{"non queryable 2"}}}));
+        });
+
+        harness.do_with_new_realm([&](SharedRealm realm) {
+            // first set a subscription to force the creation/caching of a broker snapshot on the server.
+            setup_subs(realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
+            wait_for_advance(*realm);
+            auto table = realm->read_group().get_table("class_TopLevel");
+            REQUIRE(table->find_primary_key(bar_obj_id));
+
+            // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
+            // wait for a MARK message to come back, we'd miss it in our results.
+            CppContext c(realm);
+            realm->begin_transaction();
+            Object::create(c, realm, "TopLevel",
+                           std::any(AnyDict{{"_id", bizz_obj_id},
+                                            {"queryable_str_field", std::string{"bizz"}},
+                                            {"queryable_int_field", static_cast<int64_t>(15)},
+                                            {"non_queryable_field", std::string{"non queryable 3"}}}));
+            realm->commit_transaction();
+            wait_for_upload(*realm);
+        });
+    };
+
+    SECTION("regular subscription change") {
+        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
+        std::atomic<bool> saw_truncated_bootstrap{false};
+        triggered_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
+                                                                      const SyncClientHookData& data) {
+            auto sess = weak_sess.lock();
+            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
+                return SyncClientHookAction::NoAction;
+            }
+
+            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
+            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
+            REQUIRE(data.num_changesets == 1);
+            auto db = SyncSession::OnlyForTesting::get_db(*sess);
+            auto read_tr = db->start_read();
+            auto table = read_tr->get_table("class_TopLevel");
+            REQUIRE(table->find_primary_key(bar_obj_id));
+            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
+            saw_truncated_bootstrap.store(true);
+
+            return SyncClientHookAction::NoAction;
+        };
+        auto problem_realm = Realm::get_shared_realm(triggered_config);
+
+        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
+        // on the server should have no new history entries, and then pause the router so it doesn't get any of
+        // the changes we're about to create.
+        wait_for_upload(*problem_realm);
+        wait_for_download(*problem_realm);
+
+        nlohmann::json command_request = {
+            {"command", "PAUSE_ROUTER_SESSION"},
+        };
+        auto resp_body =
+            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
+                .get();
+        REQUIRE(resp_body == "{}");
+
+        // Put some data into the server, this will be the data that will be in the broker cache.
+        setup_and_poison_cache();
+
+        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
+        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
+        // processed.
+        setup_subs(problem_realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
+        wait_for_advance(*problem_realm);
+
+        REQUIRE(saw_truncated_bootstrap.load());
+        auto table = problem_realm->read_group().get_table("class_TopLevel");
         REQUIRE(table->find_primary_key(bar_obj_id));
+        REQUIRE(table->find_primary_key(bizz_obj_id));
+    }
 
-        // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
-        // wait for a MARK message to come back, we'd miss it in our results.
-        CppContext c(realm);
-        realm->begin_transaction();
-        Object::create(c, realm, "TopLevel",
-                       std::any(AnyDict{{"_id", bizz_obj_id},
-                                        {"queryable_str_field", std::string{"bizz"}},
-                                        {"queryable_int_field", static_cast<int64_t>(15)},
-                                        {"non_queryable_field", std::string{"non queryable 3"}}}));
-        realm->commit_transaction();
-        wait_for_upload(*realm);
-    });
+    SECTION("disconnect between bootstrap and mark") {
+        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
+        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
+        auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
+        triggered_config.sync_config->on_sync_client_event_hook = [promise = std::move(shared_promise), &bizz_obj_id,
+                                                                   &bar_obj_id](std::weak_ptr<SyncSession> weak_sess,
+                                                                                const SyncClientHookData& data) {
+            auto sess = weak_sess.lock();
+            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
+                return SyncClientHookAction::NoAction;
+            }
+
+            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
+            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
+            REQUIRE(data.num_changesets == 1);
+            auto db = SyncSession::OnlyForTesting::get_db(*sess);
+            auto read_tr = db->start_read();
+            auto table = read_tr->get_table("class_TopLevel");
+            REQUIRE(table->find_primary_key(bar_obj_id));
+            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
+
+            sess->close();
+            promise->emplace_value();
+            return SyncClientHookAction::NoAction;
+        };
+        auto problem_realm = Realm::get_shared_realm(triggered_config);
 
-    // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
-    // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
-    // processed.
-    setup_subs(problem_realm);
+        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
+        // on the server should have no new history entries, and then pause the router so it doesn't get any of
+        // the changes we're about to create.
+        wait_for_upload(*problem_realm);
+        wait_for_download(*problem_realm);
 
-    auto table = problem_realm->read_group().get_table("class_TopLevel");
-    REQUIRE(table->find_primary_key(bar_obj_id));
-    REQUIRE(table->find_primary_key(bizz_obj_id));
+        nlohmann::json command_request = {
+            {"command", "PAUSE_ROUTER_SESSION"},
+        };
+        auto resp_body =
+            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
+                .get();
+        REQUIRE(resp_body == "{}");
+
+        // Put some data into the server, this will be the data that will be in the broker cache.
+        setup_and_poison_cache();
+
+        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
+        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
+        // processed.
+        auto sub_set = setup_subs(problem_realm);
+        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
+
+        interrupted.get();
+        problem_realm->sync_session()->shutdown_and_wait();
+        REQUIRE(!sub_complete_future.is_ready());
+        sub_set.refresh();
+        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
+
+        problem_realm->sync_session()->revive_if_needed();
+        sub_complete_future.get();
+        wait_for_advance(*problem_realm);
+
+        sub_set.refresh();
+        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
+        auto table = problem_realm->read_group().get_table("class_TopLevel");
+        REQUIRE(table->find_primary_key(bar_obj_id));
+        REQUIRE(table->find_primary_key(bizz_obj_id));
+    }
 }
 
 } // namespace realm::app

From d6238d32373bdd460b26f87e43105ad2ad98c981 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Mon, 10 Oct 2022 09:52:47 -0400
Subject: [PATCH 10/21] fix windows warnings

---
 src/realm/object-store/c_api/sync.cpp | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/realm/object-store/c_api/sync.cpp b/src/realm/object-store/c_api/sync.cpp
index f1790b7697d..b6608e01534 100644
--- a/src/realm/object-store/c_api/sync.cpp
+++ b/src/realm/object-store/c_api/sync.cpp
@@ -231,6 +231,7 @@ SubscriptionSet::State sub_state_from_c_enum(realm_flx_sync_subscription_set_sta
         case RLM_SYNC_SUBSCRIPTION_UNCOMMITTED:
             return SubscriptionSet::State::Uncommitted;
     }
+    REALM_UNREACHABLE();
 }
 
 realm_flx_sync_subscription_set_state_e sub_state_to_c_enum(SubscriptionSet::State state)
@@ -251,6 +252,7 @@ realm_flx_sync_subscription_set_state_e sub_state_to_c_enum(SubscriptionSet::Sta
         case SubscriptionSet::State::Superseded:
             return RLM_SYNC_SUBSCRIPTION_SUPERSEDED;
     }
+    REALM_UNREACHABLE();
 }
 
 } // namespace

From d95f001a9c248f53c56848bb2c91d2b510f36a4f Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 20 Oct 2022 16:12:02 -0400
Subject: [PATCH 11/21] fixes

---
 dependencies.list                   |  2 +-
 src/realm/sync/client.cpp           |  5 ++++-
 src/realm/sync/subscriptions.cpp    |  5 ++---
 src/realm/sync/subscriptions.hpp    | 20 ++++++++++----------
 test/object-store/sync/flx_sync.cpp |  8 ++++++--
 test/test_sync_subscriptions.cpp    |  2 +-
 6 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/dependencies.list b/dependencies.list
index e91627ba9fb..e653892089f 100644
--- a/dependencies.list
+++ b/dependencies.list
@@ -1,4 +1,4 @@
 PACKAGE_NAME=realm-core
 VERSION=12.9.0
 OPENSSL_VERSION=1.1.1n
-MDBREALM_TEST_SERVER_TAG=2022-10-10
+MDBREALM_TEST_SERVER_TAG=2022-10-20
diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp
index 316dd0f56b8..f42974af1f1 100644
--- a/src/realm/sync/client.cpp
+++ b/src/realm/sync/client.cpp
@@ -815,6 +815,9 @@ void SessionImpl::process_pending_flx_bootstrap()
         progress = *pending_batch.progress;
         changesets_processed += pending_batch.changesets.size();
 
+        REALM_ASSERT(call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
+                                     batch_state, pending_batch.changesets.size()) == SyncClientHookAction::NoAction);
+
         logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
                     "%3. %4 changesets remaining in bootstrap",
                     pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
@@ -953,7 +956,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
     REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
 
     if (m_flx_subscription_store) {
-        auto versions_info = m_flx_subscription_store->get_active_and_latest_versions();
+        auto versions_info = m_flx_subscription_store->get_version_info();
         m_flx_active_version = versions_info.active;
         m_flx_latest_version = versions_info.latest;
         m_flx_pending_mark_version = versions_info.pending_mark;
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 1558bcf7fc5..46487b07da1 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -390,8 +390,7 @@ void MutableSubscriptionSet::update_state(State new_state, util::Optional<std::s
             m_error_str = std::string{*error_str};
             break;
         case State::Bootstrapping:
-            m_state = new_state;
-            break;
+            [[fallthrough]];
         case State::AwaitingMark:
             m_state = new_state;
             break;
@@ -711,7 +710,7 @@ SubscriptionSet SubscriptionStore::get_active() const
     return SubscriptionSet(weak_from_this(), *tr, res.get_object(0));
 }
 
-SubscriptionStore::VersionInfo SubscriptionStore::get_active_and_latest_versions() const
+SubscriptionStore::VersionInfo SubscriptionStore::get_version_info() const
 {
     auto tr = m_db->start_read();
     auto sub_sets = tr->get_table(m_sub_set_table);
diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp
index 7d7dac3fcac..8e8e207fe52 100644
--- a/src/realm/sync/subscriptions.hpp
+++ b/src/realm/sync/subscriptions.hpp
@@ -90,13 +90,13 @@ class SubscriptionSet {
     /*
      * State diagram:
      *
-     *                    ┌───────────┬─────────►Error─────────────┐
-     *                    │           │                            │
-     *                    │           │                            ▼
+     *                    ┌───────────┬─────────►Error──────────────────────────┐
+     *                    │           │                                         │
+     *                    │           │                                         ▼
      *   Uncommitted──►Pending──►Bootstrapping──►AwaitingMark──►Complete───►Superseded
-     *                    │                                        ▲
-     *                    │                                        │
-     *                    └────────────────────────────────────────┘
+     *                    │                            ▲
+     *                    │                            │
+     *                    └────────────────────────────┘
      *
      */
     enum class State {
@@ -107,9 +107,6 @@ class SubscriptionSet {
         Pending,
         // The server is currently sending the initial state that represents this subscription set to the client.
         Bootstrapping,
-        // The last bootstrap message containing the initial state for this subscription set has been received. The
-        // client is awaiting a mark message to mark this subscription as fully caught up to history.
-        AwaitingMark,
         // This subscription set is the active subscription set that is currently being synchronized with the server.
         Complete,
         // An error occurred while processing this subscription set on the server. Check error_str() for details.
@@ -117,6 +114,9 @@ class SubscriptionSet {
         // The server responded to a later subscription set to this one and this one has been trimmed from the
         // local storage of subscription sets.
         Superseded,
+        // The last bootstrap message containing the initial state for this subscription set has been received. The
+        // client is awaiting a mark message to mark this subscription as fully caught up to history.
+        AwaitingMark,
     };
 
     static constexpr int64_t EmptyVersion = int64_t(-1);
@@ -327,7 +327,7 @@ class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore>
     };
     // Returns the version number of the current active and latest subscription sets. This function guarantees
     // that the versions will be read from the same underlying transaction and will thus be consistent.
-    VersionInfo get_active_and_latest_versions() const;
+    VersionInfo get_version_info() const;
 
     // To be used internally by the sync client. This returns a mutable view of a subscription set by its
     // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound.
diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp
index 20519e03a0f..b4d636a2bed 100644
--- a/test/object-store/sync/flx_sync.cpp
+++ b/test/object-store/sync/flx_sync.cpp
@@ -1194,7 +1194,7 @@ TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][f
         options.encryption_key = test_util::crypt_key();
         auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
         auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
-        auto version_info = sub_store->get_active_and_latest_versions();
+        auto version_info = sub_store->get_version_info();
         REQUIRE(version_info.active == 0);
         REQUIRE(version_info.latest == 1);
         auto latest_subs = sub_store->get_latest();
@@ -1595,7 +1595,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
         REQUIRE(top_level->is_empty());
 
         auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
-        auto version_info = sub_store->get_active_and_latest_versions();
+        auto version_info = sub_store->get_version_info();
         REQUIRE(version_info.latest == 1);
         REQUIRE(version_info.active == 0);
         auto latest_subs = sub_store->get_latest();
@@ -1845,6 +1845,10 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][app]
 
                 auto latest_sub_set = session->get_flx_subscription_store()->get_latest();
                 auto active_sub_set = session->get_flx_subscription_store()->get_active();
+                auto version_info = session->get_flx_subscription_store()->get_version_info();
+                REQUIRE(version_info.pending_mark == active_sub_set.version());
+                REQUIRE(version_info.active == active_sub_set.version());
+                REQUIRE(version_info.latest == latest_sub_set.version());
                 REQUIRE(latest_sub_set.version() == active_sub_set.version());
                 REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
 
diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp
index 0297d9ebd8f..33dd58a5744 100644
--- a/test/test_sync_subscriptions.cpp
+++ b/test/test_sync_subscriptions.cpp
@@ -432,7 +432,7 @@ TEST(Sync_SubscriptionStoreInternalSchemaMigration)
     util::File::copy(path.string(), sub_store_path);
     SubscriptionStoreFixture fixture(sub_store_path);
     auto store = SubscriptionStore::create(fixture.db, [](int64_t) {});
-    auto [active_version, latest_version, pending_mark_version] = store->get_active_and_latest_versions();
+    auto [active_version, latest_version, pending_mark_version] = store->get_version_info();
     CHECK_EQUAL(active_version, latest_version);
     auto active = store->get_active();
     CHECK_EQUAL(active.version(), 1);

From 3cc9a78a5f1850fd110a83e8225e0d08ebcc1c22 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 20 Oct 2022 16:17:37 -0400
Subject: [PATCH 12/21] changelog

---
 CHANGELOG.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index af566db7591..ac21bd6cd5f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,7 @@
 * <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
 * Improve performance of client reset with automatic recovery and converting top-level tables into embedded tables (PR [#5897](https://github.com/realm/realm-core/pull/5897)).
 * Adding `realm_query_parse_for_set` in the C API ([#5935](https://github.com/realm/realm-core/pull/5935)).
+* Flexible sync will now wait for the server to have sent all pending history after a bootstrap before marking a subscription as Complete. ([#5795](https://github.com/realm/realm-core/pull/5795))
 
 ### Fixed
 * <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
@@ -13,6 +14,7 @@
 ### Breaking changes
 * Rename RealmConfig::automatic_handle_backlicks_in_migrations to RealmConfig::automatically_handle_backlinks_in_migrations ([PR #5897](https://github.com/realm/realm-core/pull/5897)).
 * Introduced new callback type realm_return_apikey_list_func_t and realm_return_apikey_func_t in the C-API ([PR #5945](https://github.com/realm/realm-core/pull/5945)).
+* The name of one of the RLM_SYNC_BOOTSTRAPPING enum member in the C api was updated to match the naming convention of the other members in the enum.
 
 ### Compatibility
 * Fileformat: Generates files with format v22. Reads and automatically upgrade from fileformat v5.

From 1197d03f3295345cea4a23820ec8c6627dafe960 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Thu, 20 Oct 2022 17:26:58 -0400
Subject: [PATCH 13/21] fix ordering of state enum

---
 src/realm/object-store/c_api/sync.cpp |  2 +-
 src/realm/sync/subscriptions.cpp      | 27 +++++++++++++++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/src/realm/object-store/c_api/sync.cpp b/src/realm/object-store/c_api/sync.cpp
index b6608e01534..288dd70829b 100644
--- a/src/realm/object-store/c_api/sync.cpp
+++ b/src/realm/object-store/c_api/sync.cpp
@@ -215,7 +215,7 @@ static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientResetNo
 
 SubscriptionSet::State sub_state_from_c_enum(realm_flx_sync_subscription_set_state_e value)
 {
-    switch (static_cast<realm_flx_sync_subscription_set_state>(value)) {
+    switch (value) {
         case RLM_SYNC_SUBSCRIPTION_PENDING:
             return SubscriptionSet::State::Pending;
         case RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING:
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 46487b07da1..29967975740 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -114,6 +114,27 @@ int64_t state_to_storage(SubscriptionSet::State state)
     }
 }
 
+size_t state_to_order(SubscriptionSet::State needle)
+{
+    using State = SubscriptionSet::State;
+    switch (needle) {
+        case State::Uncommitted:
+            return 0;
+        case State::Pending:
+            return 1;
+        case State::Bootstrapping:
+            return 2;
+        case State::AwaitingMark:
+            return 3;
+        case State::Complete:
+            return 4;
+        case State::Error:
+            return 5;
+        case State::Superseded:
+            return 6;
+    }
+}
+
 } // namespace
 
 Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
@@ -461,7 +482,7 @@ util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notificat
     if (cur_state == State::Error) {
         return util::Future<State>::make_ready(Status{ErrorCodes::RuntimeError, err_str});
     }
-    else if (cur_state >= notify_when) {
+    else if (state_to_order(cur_state) >= state_to_order(notify_when)) {
         return util::Future<State>::make_ready(cur_state);
     }
 
@@ -486,8 +507,10 @@ void MutableSubscriptionSet::process_notifications()
         return mgr->m_outstanding_requests == 0;
     });
 
+
     for (auto it = mgr->m_pending_notifications.begin(); it != mgr->m_pending_notifications.end();) {
-        if ((it->version == my_version && (new_state == State::Error || new_state >= it->notify_when)) ||
+        if ((it->version == my_version &&
+             (new_state == State::Error || state_to_order(new_state) >= state_to_order(it->notify_when))) ||
             (new_state == State::Complete && it->version < my_version)) {
             to_finish.splice(to_finish.end(), mgr->m_pending_notifications, it++);
         }

From ea5a07c6bdddae69ad68f4a42eac79f06db47f1b Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Fri, 21 Oct 2022 16:30:57 -0400
Subject: [PATCH 14/21] update changelog

---
 CHANGELOG.md | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9c84ea572b8..20642645ce9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,7 @@
 # NEXT RELEASE
 
 ### Enhancements
-* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
-* None.
+* Flexible sync will now wait for the server to have sent all pending history after a bootstrap before marking a subscription as Complete. ([#5795](https://github.com/realm/realm-core/pull/5795))
 
 ### Fixed
 * <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
@@ -10,6 +9,7 @@
 
 ### Breaking changes
 * Websocket errors caused by the client sending a websocket message that is too large (i.e. greater than 16MB) now get reported as a `ProtocolError::limits_exceeded` error with a `ClientReset` requested by the server ([#5209](https://github.com/realm/realm-core/issues/5209)).
+* The name of one of the RLM_SYNC_BOOTSTRAPPING enum member in the C api was updated to match the naming convention of the other members in the enum.
 
 ### Compatibility
 * Fileformat: Generates files with format v22. Reads and automatically upgrade from fileformat v5.
@@ -27,7 +27,6 @@
 ### Enhancements
 * Improve performance of client reset with automatic recovery and converting top-level tables into embedded tables (PR [#5897](https://github.com/realm/realm-core/pull/5897)).
 * Adding `realm_query_parse_for_set` in the C API ([#5935](https://github.com/realm/realm-core/pull/5935)).
-* Flexible sync will now wait for the server to have sent all pending history after a bootstrap before marking a subscription as Complete. ([#5795](https://github.com/realm/realm-core/pull/5795))
 
 ### Fixed
 * Fixed an assertion failure when observing change notifications on a sectioned result, if the first modification was to a linked property that did not cause the state of the sections to change. ([#5912](https://github.com/realm/realm-core/issues/5912), since the introduction of sectioned results in v12.3.0)
@@ -38,7 +37,6 @@
 ### Breaking changes
 * Rename RealmConfig::automatic_handle_backlicks_in_migrations to RealmConfig::automatically_handle_backlinks_in_migrations ([PR #5897](https://github.com/realm/realm-core/pull/5897)).
 * Introduced new callback type realm_return_apikey_list_func_t and realm_return_apikey_func_t in the C-API ([PR #5945](https://github.com/realm/realm-core/pull/5945)).
-* The name of one of the RLM_SYNC_BOOTSTRAPPING enum member in the C api was updated to match the naming convention of the other members in the enum.
 
 ### Compatibility
 * Fileformat: Generates files with format v22. Reads and automatically upgrade from fileformat v5.

From e8ccb4c300fc66a37d5e7526b163eea484b194da Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Fri, 21 Oct 2022 19:20:45 -0400
Subject: [PATCH 15/21] bump server

---
 dependencies.list | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/dependencies.list b/dependencies.list
index 7b275a104b2..50b45b717cc 100644
--- a/dependencies.list
+++ b/dependencies.list
@@ -1,4 +1,4 @@
 PACKAGE_NAME=realm-core
 VERSION=12.10.0
 OPENSSL_VERSION=1.1.1n
-MDBREALM_TEST_SERVER_TAG=2022-10-20
+MDBREALM_TEST_SERVER_TAG=2022-10-21

From 9d910427ea1ed829220c1bf85b3e202a2dd2940a Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Mon, 24 Oct 2022 16:55:43 -0400
Subject: [PATCH 16/21] fix windows warning

---
 src/realm/sync/subscriptions.cpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 29967975740..aae8998d2e3 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -133,6 +133,7 @@ size_t state_to_order(SubscriptionSet::State needle)
         case State::Superseded:
             return 6;
     }
+    REALM_UNREACHABLE();
 }
 
 } // namespace

From 30523f6fb3e2d2c8ff2d8281aba560ec2a262b86 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 25 Oct 2022 16:13:00 -0400
Subject: [PATCH 17/21] remove unnecessary enum conversion logic

---
 src/realm/object-store/c_api/sync.cpp | 63 ++++++++-------------------
 src/realm/sync/subscriptions.cpp      |  9 +---
 2 files changed, 19 insertions(+), 53 deletions(-)

diff --git a/src/realm/object-store/c_api/sync.cpp b/src/realm/object-store/c_api/sync.cpp
index 288dd70829b..e9de48bb27c 100644
--- a/src/realm/object-store/c_api/sync.cpp
+++ b/src/realm/object-store/c_api/sync.cpp
@@ -213,47 +213,19 @@ static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientReset)
 static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::ClientResetNoRecovery) ==
               RLM_SYNC_ERROR_ACTION_CLIENT_RESET_NO_RECOVERY);
 
-SubscriptionSet::State sub_state_from_c_enum(realm_flx_sync_subscription_set_state_e value)
-{
-    switch (value) {
-        case RLM_SYNC_SUBSCRIPTION_PENDING:
-            return SubscriptionSet::State::Pending;
-        case RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING:
-            return SubscriptionSet::State::Bootstrapping;
-        case RLM_SYNC_SUBSCRIPTION_AWAITING_MARK:
-            return SubscriptionSet::State::AwaitingMark;
-        case RLM_SYNC_SUBSCRIPTION_COMPLETE:
-            return SubscriptionSet::State::Complete;
-        case RLM_SYNC_SUBSCRIPTION_ERROR:
-            return SubscriptionSet::State::Error;
-        case RLM_SYNC_SUBSCRIPTION_SUPERSEDED:
-            return SubscriptionSet::State::Superseded;
-        case RLM_SYNC_SUBSCRIPTION_UNCOMMITTED:
-            return SubscriptionSet::State::Uncommitted;
-    }
-    REALM_UNREACHABLE();
-}
-
-realm_flx_sync_subscription_set_state_e sub_state_to_c_enum(SubscriptionSet::State state)
-{
-    switch (state) {
-        case SubscriptionSet::State::Pending:
-            return RLM_SYNC_SUBSCRIPTION_PENDING;
-        case SubscriptionSet::State::Bootstrapping:
-            return RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING;
-        case SubscriptionSet::State::AwaitingMark:
-            return RLM_SYNC_SUBSCRIPTION_AWAITING_MARK;
-        case SubscriptionSet::State::Complete:
-            return RLM_SYNC_SUBSCRIPTION_COMPLETE;
-        case SubscriptionSet::State::Error:
-            return RLM_SYNC_SUBSCRIPTION_ERROR;
-        case SubscriptionSet::State::Uncommitted:
-            return RLM_SYNC_SUBSCRIPTION_UNCOMMITTED;
-        case SubscriptionSet::State::Superseded:
-            return RLM_SYNC_SUBSCRIPTION_SUPERSEDED;
-    }
-    REALM_UNREACHABLE();
-}
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Pending) ==
+              RLM_SYNC_SUBSCRIPTION_PENDING);
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Bootstrapping) ==
+              RLM_SYNC_SUBSCRIPTION_BOOTSTRAPPING);
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::AwaitingMark) ==
+              RLM_SYNC_SUBSCRIPTION_AWAITING_MARK);
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Complete) ==
+              RLM_SYNC_SUBSCRIPTION_COMPLETE);
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Error) == RLM_SYNC_SUBSCRIPTION_ERROR);
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Superseded) ==
+              RLM_SYNC_SUBSCRIPTION_SUPERSEDED);
+static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Uncommitted) ==
+              RLM_SYNC_SUBSCRIPTION_UNCOMMITTED);
 
 } // namespace
 
@@ -612,8 +584,8 @@ realm_sync_on_subscription_set_state_change_wait(const realm_flx_sync_subscripti
 {
     REALM_ASSERT(subscription_set != nullptr);
     SubscriptionSet::State state =
-        subscription_set->get_state_change_notification(sub_state_from_c_enum(notify_when)).get();
-    return sub_state_to_c_enum(state);
+        subscription_set->get_state_change_notification(static_cast<SubscriptionSet::State>(notify_when)).get();
+    return static_cast<realm_flx_sync_subscription_set_state_e>(state);
 }
 
 RLM_API bool
@@ -624,12 +596,13 @@ realm_sync_on_subscription_set_state_change_async(const realm_flx_sync_subscript
 {
     REALM_ASSERT(subscription_set != nullptr && callback != nullptr);
     return wrap_err([&]() {
-        auto future_state = subscription_set->get_state_change_notification(sub_state_from_c_enum(notify_when));
+        auto future_state =
+            subscription_set->get_state_change_notification(static_cast<SubscriptionSet::State>(notify_when));
         std::move(future_state)
             .get_async([callback, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](
                            const StatusWith<SubscriptionSet::State>& state) -> void {
                 if (state.is_ok())
-                    callback(userdata.get(), sub_state_to_c_enum(state.get_value()));
+                    callback(userdata.get(), static_cast<realm_flx_sync_subscription_set_state_e>(state.get_value()));
                 else
                     callback(userdata.get(), realm_flx_sync_subscription_set_state_e::RLM_SYNC_SUBSCRIPTION_ERROR);
             });
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index aae8998d2e3..f6efb09fdaf 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -58,9 +58,6 @@ constexpr static std::string_view c_flx_sub_query_str_field("query");
 using OptionalString = util::Optional<std::string>;
 
 enum class SubscriptionStateForStorage : int64_t {
-    // This subscription set has not been persisted and has not been sent to the server. This state is only valid
-    // for MutableSubscriptionSets
-    Uncommitted = 0,
     // The subscription set has been persisted locally but has not been acknowledged by the server yet.
     Pending = 1,
     // The server is currently sending the initial state that represents this subscription set to the client.
@@ -77,8 +74,6 @@ enum class SubscriptionStateForStorage : int64_t {
 SubscriptionSet::State state_from_storage(int64_t value)
 {
     switch (static_cast<SubscriptionStateForStorage>(value)) {
-        case SubscriptionStateForStorage::Uncommitted:
-            return SubscriptionSet::State::Uncommitted;
         case SubscriptionStateForStorage::Pending:
             return SubscriptionSet::State::Pending;
         case SubscriptionStateForStorage::Bootstrapping:
@@ -90,15 +85,13 @@ SubscriptionSet::State state_from_storage(int64_t value)
         case SubscriptionStateForStorage::Error:
             return SubscriptionSet::State::Error;
         default:
-            REALM_UNREACHABLE();
+            throw std::runtime_error(util::format("Invalid state for SubscriptionSet stored on disk: %1", value));
     }
 }
 
 int64_t state_to_storage(SubscriptionSet::State state)
 {
     switch (state) {
-        case SubscriptionSet::State::Uncommitted:
-            return static_cast<int64_t>(SubscriptionStateForStorage::Uncommitted);
         case SubscriptionSet::State::Pending:
             return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
         case SubscriptionSet::State::Bootstrapping:

From 32ad0aab7297ff2915f89307995f2c4eb2c7905a Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 25 Oct 2022 17:20:34 -0400
Subject: [PATCH 18/21] do not load empty MutableSubscriptionSets from disk

---
 src/realm/CMakeLists.txt                      |  1 +
 src/realm/object-store/CMakeLists.txt         |  1 -
 src/realm/object-store/property.hpp           |  3 +--
 src/realm/sync/subscriptions.cpp              | 21 +++++++++++--------
 src/realm/sync/subscriptions.hpp              | 10 ++++++---
 .../{object-store => }/util/tagged_bool.hpp   |  0
 test/object-store/util/test_file.hpp          |  2 +-
 7 files changed, 22 insertions(+), 16 deletions(-)
 rename src/realm/{object-store => }/util/tagged_bool.hpp (100%)

diff --git a/src/realm/CMakeLists.txt b/src/realm/CMakeLists.txt
index 9dbc457579a..27549f9323c 100644
--- a/src/realm/CMakeLists.txt
+++ b/src/realm/CMakeLists.txt
@@ -250,6 +250,7 @@ set(REALM_INSTALL_HEADERS
     util/serializer.hpp
     util/sha_crypto.hpp
     util/span.hpp
+    util/tagged_bool.hpp
     util/terminate.hpp
     util/thread.hpp
     util/to_string.hpp
diff --git a/src/realm/object-store/CMakeLists.txt b/src/realm/object-store/CMakeLists.txt
index 6aca290d650..7e9af655661 100644
--- a/src/realm/object-store/CMakeLists.txt
+++ b/src/realm/object-store/CMakeLists.txt
@@ -83,7 +83,6 @@ set(HEADERS
     util/copyable_atomic.hpp
     util/event_loop_dispatcher.hpp
     util/scheduler.hpp
-    util/tagged_bool.hpp
     util/tagged_string.hpp
     util/uuid.hpp
 
diff --git a/src/realm/object-store/property.hpp b/src/realm/object-store/property.hpp
index 5d806d4ba37..62b9e4f7665 100644
--- a/src/realm/object-store/property.hpp
+++ b/src/realm/object-store/property.hpp
@@ -19,13 +19,12 @@
 #ifndef REALM_PROPERTY_HPP
 #define REALM_PROPERTY_HPP
 
-#include <realm/object-store/util/tagged_bool.hpp>
-
 #include <realm/util/features.h>
 #include <realm/util/assert.hpp>
 // FIXME: keys.hpp is currently pretty heavyweight
 #include <realm/keys.hpp>
 #include <realm/util/optional.hpp>
+#include <realm/util/tagged_bool.hpp>
 
 #include <string>
 
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index f6efb09fdaf..6ac9167ea50 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -190,11 +190,14 @@ std::string_view Subscription::query_string() const
     return m_query_string;
 }
 
-SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj)
+SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj,
+                                 MakingMutableCopy making_mutable_copy)
     : m_mgr(mgr)
+    , m_cur_version(tr.get_version())
+    , m_version(obj.get_primary_key().get_int())
 {
-    if (obj.is_valid()) {
-        load_from_database(tr, std::move(obj));
+    if (!making_mutable_copy && obj.is_valid()) {
+        load_from_database(std::move(obj));
     }
 }
 
@@ -205,12 +208,10 @@ SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int
 {
 }
 
-void SubscriptionSet::load_from_database(const Transaction& tr, Obj obj)
+void SubscriptionSet::load_from_database(Obj obj)
 {
     auto mgr = get_flx_subscription_store(); // Throws
 
-    m_cur_version = tr.get_version();
-    m_version = obj.get_primary_key().get_int();
     m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
     m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
     m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
@@ -288,8 +289,9 @@ SubscriptionSet::const_iterator SubscriptionSet::find(const Query& query) const
     });
 }
 
-MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj)
-    : SubscriptionSet(mgr, *tr, obj)
+MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
+                                               MakingMutableCopy making_mutable_copy)
+    : SubscriptionSet(mgr, *tr, obj, making_mutable_copy)
     , m_tr(std::move(tr))
     , m_obj(std::move(obj))
     , m_old_state(state())
@@ -898,7 +900,8 @@ MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSe
     auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1;
 
     MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr),
-                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}));
+                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}),
+                                       SubscriptionSet::MakingMutableCopy{true});
     for (const auto& sub : set) {
         new_set_obj.insert_sub(sub);
     }
diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp
index 8e8e207fe52..27937749a91 100644
--- a/src/realm/sync/subscriptions.hpp
+++ b/src/realm/sync/subscriptions.hpp
@@ -26,6 +26,7 @@
 #include "realm/util/future.hpp"
 #include "realm/util/functional.hpp"
 #include "realm/util/optional.hpp"
+#include "realm/util/tagged_bool.hpp"
 
 #include <list>
 #include <set>
@@ -201,11 +202,13 @@ class SubscriptionSet {
     friend class SubscriptionStore;
     struct SupersededTag {
     };
+    using MakingMutableCopy = util::TaggedBool<class MakingMutableCopyTag>;
 
     explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag);
-    explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj);
+    explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj,
+                             MakingMutableCopy making_mutable_copy = MakingMutableCopy(false));
 
-    void load_from_database(const Transaction& tr, Obj obj);
+    void load_from_database(Obj obj);
 
     // Get a reference to the SubscriptionStore. It may briefly extend the lifetime of the store.
     std::shared_ptr<const SubscriptionStore> get_flx_subscription_store() const;
@@ -276,7 +279,8 @@ class MutableSubscriptionSet : public SubscriptionSet {
 protected:
     friend class SubscriptionStore;
 
-    MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj);
+    MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
+                           MakingMutableCopy making_mutable_copy = MakingMutableCopy{false});
 
     void insert_sub(const Subscription& sub);
 
diff --git a/src/realm/object-store/util/tagged_bool.hpp b/src/realm/util/tagged_bool.hpp
similarity index 100%
rename from src/realm/object-store/util/tagged_bool.hpp
rename to src/realm/util/tagged_bool.hpp
diff --git a/test/object-store/util/test_file.hpp b/test/object-store/util/test_file.hpp
index a6366d373dc..2ff9fe27e30 100644
--- a/test/object-store/util/test_file.hpp
+++ b/test/object-store/util/test_file.hpp
@@ -20,7 +20,7 @@
 #define REALM_TEST_UTIL_TEST_FILE_HPP
 
 #include <realm/object-store/shared_realm.hpp>
-#include <realm/object-store/util/tagged_bool.hpp>
+#include <realm/util/tagged_bool.hpp>
 
 #include <realm/util/logger.hpp>
 #include <realm/util/optional.hpp>

From 95fa8565fbea4aade8d900ab1863d7d1d5960980 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 25 Oct 2022 17:52:02 -0400
Subject: [PATCH 19/21] fix getting the active subscription when there is none

---
 src/realm/sync/subscriptions.cpp | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 6ac9167ea50..3ce4936b337 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -196,7 +196,8 @@ SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, con
     , m_cur_version(tr.get_version())
     , m_version(obj.get_primary_key().get_int())
 {
-    if (!making_mutable_copy && obj.is_valid()) {
+    REALM_ASSERT(obj.is_valid());
+    if (!making_mutable_copy) {
         load_from_database(std::move(obj));
     }
 }
@@ -723,8 +724,9 @@ SubscriptionSet SubscriptionStore::get_active() const
                    .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
                    .find_all(descriptor_ordering);
 
+    // If there is no active subscription yet, return the zero'th subscription.
     if (res.is_empty()) {
-        return SubscriptionSet(weak_from_this(), *tr, Obj{});
+        return SubscriptionSet(weak_from_this(), *tr, sub_sets->get_object_with_primary_key(int64_t(0)));
     }
     return SubscriptionSet(weak_from_this(), *tr, res.get_object(0));
 }

From 550d81c21c7a9a3f0646e2ca6f723de9d6766645 Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Tue, 1 Nov 2022 18:37:13 -0400
Subject: [PATCH 20/21] remove unneeded case and add more tests

---
 src/realm/sync/client.cpp           | 21 +++++--
 src/realm/sync/config.hpp           |  1 +
 test/object-store/sync/flx_sync.cpp | 95 ++++++++++++++++++++++-------
 3 files changed, 90 insertions(+), 27 deletions(-)

diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp
index f42974af1f1..5787f7282f4 100644
--- a/src/realm/sync/client.cpp
+++ b/src/realm/sync/client.cpp
@@ -879,7 +879,21 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
     data.progress = progress;
     data.num_changesets = num_changesets;
     data.query_version = query_version;
-    return m_wrapper.m_debug_hook(data);
+
+    auto action = m_wrapper.m_debug_hook(data);
+    switch (action) {
+        case realm::SyncClientHookAction::SuspendWithRetryableError: {
+            SessionErrorInfo err_info(make_error_code(ProtocolError::other_session_error), "hook requested error",
+                                      true);
+            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
+
+            auto err_processing_err = receive_error_message(err_info);
+            REALM_ASSERT(!err_processing_err);
+            return SyncClientHookAction::NoAction;
+        }
+        default:
+            return action;
+    }
 }
 
 bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
@@ -1482,11 +1496,6 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
 void SessionWrapper::on_resumed()
 {
     m_suspended = false;
-    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
-        m_sess->logger.debug("Requesting download notification for query version %1 after resume",
-                             m_flx_pending_mark_version);
-        m_sess->request_download_completion_notification();
-    }
     if (m_connection_state_change_listener) {
         ClientImpl::Connection& conn = m_sess->get_connection();
         if (conn.get_state() != ConnectionState::disconnected) {
diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp
index ee9b404c891..350e4d133b7 100644
--- a/src/realm/sync/config.hpp
+++ b/src/realm/sync/config.hpp
@@ -140,6 +140,7 @@ enum class SyncClientHookEvent {
 enum class SyncClientHookAction {
     NoAction,
     EarlyReturn,
+    SuspendWithRetryableError,
 };
 
 struct SyncClientHookData {
diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp
index b4d636a2bed..c2c2e24d15d 100644
--- a/test/object-store/sync/flx_sync.cpp
+++ b/test/object-store/sync/flx_sync.cpp
@@ -2277,28 +2277,27 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
     SECTION("disconnect between bootstrap and mark") {
         SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
         auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
-        auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
-        triggered_config.sync_config->on_sync_client_event_hook = [promise = std::move(shared_promise), &bizz_obj_id,
-                                                                   &bar_obj_id](std::weak_ptr<SyncSession> weak_sess,
-                                                                                const SyncClientHookData& data) {
-            auto sess = weak_sess.lock();
-            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
-                return SyncClientHookAction::NoAction;
-            }
-
-            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
-            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
-            REQUIRE(data.num_changesets == 1);
-            auto db = SyncSession::OnlyForTesting::get_db(*sess);
-            auto read_tr = db->start_read();
-            auto table = read_tr->get_table("class_TopLevel");
-            REQUIRE(table->find_primary_key(bar_obj_id));
-            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
+        triggered_config.sync_config->on_sync_client_event_hook =
+            [promise = util::CopyablePromiseHolder(std::move(interrupted_promise)), &bizz_obj_id,
+             &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) mutable {
+                auto sess = weak_sess.lock();
+                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
+                    return SyncClientHookAction::NoAction;
+                }
 
-            sess->close();
-            promise->emplace_value();
-            return SyncClientHookAction::NoAction;
-        };
+                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
+                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
+                REQUIRE(data.num_changesets == 1);
+                auto db = SyncSession::OnlyForTesting::get_db(*sess);
+                auto read_tr = db->start_read();
+                auto table = read_tr->get_table("class_TopLevel");
+                REQUIRE(table->find_primary_key(bar_obj_id));
+                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
+
+                sess->close();
+                promise.get_promise().emplace_value();
+                return SyncClientHookAction::NoAction;
+            };
         auto problem_realm = Realm::get_shared_realm(triggered_config);
 
         // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
@@ -2334,6 +2333,60 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
         sub_complete_future.get();
         wait_for_advance(*problem_realm);
 
+        sub_set.refresh();
+        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
+        auto table = problem_realm->read_group().get_table("class_TopLevel");
+        REQUIRE(table->find_primary_key(bar_obj_id));
+        REQUIRE(table->find_primary_key(bizz_obj_id));
+    }
+    SECTION("error/suspend between bootstrap and mark") {
+        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
+        triggered_config.sync_config->on_sync_client_event_hook =
+            [&bizz_obj_id, &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) {
+                auto sess = weak_sess.lock();
+                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
+                    return SyncClientHookAction::NoAction;
+                }
+
+                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
+                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
+                REQUIRE(data.num_changesets == 1);
+                auto db = SyncSession::OnlyForTesting::get_db(*sess);
+                auto read_tr = db->start_read();
+                auto table = read_tr->get_table("class_TopLevel");
+                REQUIRE(table->find_primary_key(bar_obj_id));
+                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
+
+                return SyncClientHookAction::SuspendWithRetryableError;
+            };
+        auto problem_realm = Realm::get_shared_realm(triggered_config);
+
+        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
+        // on the server should have no new history entries, and then pause the router so it doesn't get any of
+        // the changes we're about to create.
+        wait_for_upload(*problem_realm);
+        wait_for_download(*problem_realm);
+
+        nlohmann::json command_request = {
+            {"command", "PAUSE_ROUTER_SESSION"},
+        };
+        auto resp_body =
+            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
+                .get();
+        REQUIRE(resp_body == "{}");
+
+        // Put some data into the server, this will be the data that will be in the broker cache.
+        setup_and_poison_cache();
+
+        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
+        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
+        // processed.
+        auto sub_set = setup_subs(problem_realm);
+        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
+
+        sub_complete_future.get();
+        wait_for_advance(*problem_realm);
+
         sub_set.refresh();
         REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
         auto table = problem_realm->read_group().get_table("class_TopLevel");

From 684e59625ac2c6e3a09e82beb7c5ef4dadec914c Mon Sep 17 00:00:00 2001
From: Jonathan Reams <jbreams@mongodb.com>
Date: Wed, 2 Nov 2022 09:37:01 -0400
Subject: [PATCH 21/21] fix changelog

---
 CHANGELOG.md                     | 2 +-
 src/realm/sync/subscriptions.cpp | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 698bc6b7bf2..199e3cc4980 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@
 ### Fixed
 * <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
 * Fix a race condition which could result in "operation cancelled" errors being delivered to async open callbacks rather than the actual sync error which caused things to fail ([PR #5968](https://github.com/realm/realm-core/pull/5968), since the introduction of async open).
+* The name of one of the RLM_SYNC_BOOTSTRAPPING enum member in the C api was updated to match the naming convention of the other members in the enum.
  
 ### Breaking changes
 * None.
@@ -29,7 +30,6 @@
 
 ### Breaking changes
 * Websocket errors caused by the client sending a websocket message that is too large (i.e. greater than 16MB) now get reported as a `ProtocolError::limits_exceeded` error with a `ClientReset` requested by the server ([#5209](https://github.com/realm/realm-core/issues/5209)).
-* The name of one of the RLM_SYNC_BOOTSTRAPPING enum member in the C api was updated to match the naming convention of the other members in the enum.
 
 ### Compatibility
 * Fileformat: Generates files with format v22. Reads and automatically upgrade from fileformat v5.
diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp
index 3ce4936b337..1554b171cf9 100644
--- a/src/realm/sync/subscriptions.cpp
+++ b/src/realm/sync/subscriptions.cpp
@@ -504,7 +504,6 @@ void MutableSubscriptionSet::process_notifications()
         return mgr->m_outstanding_requests == 0;
     });
 
-
     for (auto it = mgr->m_pending_notifications.begin(); it != mgr->m_pending_notifications.end();) {
         if ((it->version == my_version &&
              (new_state == State::Error || state_to_order(new_state) >= state_to_order(it->notify_when))) ||