Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure interrupted FLX bootstraps get restarted after re-connecting #5466

Merged
merged 7 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* C API client reset callbacks don't leak the `realm_t` parameter. ([#5464](https://github.com/realm/realm-core/pull/5464))
* The sync client may have sent a corrupted upload cursor leading to a fatal error from the server due to an uninitialized variable. ([#5460](https://github.com/realm/realm-core/pull/5460), since v11.14.0)
* The realm_async_open_task_start() in C API was not really useful as the received realm reference could not be transferred to another thread. ([#5465](https://github.com/realm/realm-core/pull/5465), since v11.5.0)
* FLX sync would not correctly resume syncing if a bootstrap was interrupted ([#5466](https://github.com/realm/realm-core/pull/5466), since v11.8.0)

### Breaking changes
* Extra `realm_free_userdata_func_t` parameter added on some realm_config_set_... functions in the C API. The userdata will be freed when the config object is freed.
Expand Down
6 changes: 6 additions & 0 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ void SyncSession::create_sync_session()
session_config.ssl_trust_certificate_path = m_config.ssl_trust_certificate_path;
session_config.ssl_verify_callback = m_config.ssl_verify_callback;
session_config.proxy_config = m_config.proxy_config;
if (m_config.on_download_message_received_hook) {
session_config.on_download_message_received_hook = [hook = m_config.on_download_message_received_hook,
anchor = weak_from_this()] {
hook(anchor);
};
}

{
std::string sync_route = m_sync_manager->sync_route();
Expand Down
11 changes: 11 additions & 0 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
util::UniqueFunction<ProgressHandler> m_progress_handler;
util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;

std::function<void()> m_on_download_message_received_hook;

std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
int64_t m_flx_active_version = 0;
int64_t m_flx_last_seen_version = 0;
Expand Down Expand Up @@ -743,6 +745,14 @@ SubscriptionStore* SessionImpl::get_flx_subscription_store()
return m_wrapper.get_flx_subscription_store();
}

void SessionImpl::receive_download_message_hook()
{
if (REALM_LIKELY(!m_wrapper.m_on_download_message_received_hook)) {
return;
}
m_wrapper.m_on_download_message_received_hook();
}

// ################ SessionWrapper ################

SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
Expand All @@ -764,6 +774,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_signed_access_token{std::move(config.signed_user_token)}
, m_client_reset_config{std::move(config.client_reset_config)}
, m_proxy_config{config.proxy_config} // Throws
, m_on_download_message_received_hook(std::move(config.on_download_message_received_hook))
, m_flx_subscription_store(std::move(flx_sub_store))
{
REALM_ASSERT(m_db);
Expand Down
5 changes: 5 additions & 0 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ class Session {
///
/// This feature exists exclusively for testing purposes at this time.
bool simulate_integration_error = false;

// Will be called after a download message is received and validated by
// the client but befefore it's been transformed or applied. To be used in
// testing only.
std::function<void()> on_download_message_received_hook;
};

/// \brief Start a new session for the specified client-side Realm.
Expand Down
4 changes: 4 additions & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ struct SyncConfig {
std::function<void(std::shared_ptr<Realm> before_frozen, std::shared_ptr<Realm> after, bool did_recover)>
notify_after_client_reset;

// Will be called after a download message is received and validated by the client but befefore it's been
// transformed or applied. To be used in testing only.
std::function<void(std::weak_ptr<SyncSession>)> on_download_message_received_hook;
Copy link
Collaborator

@danieltabacaru danieltabacaru May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you really need the SyncSession here? It seems to be used only in tests. Can you use realm->sync_session() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there's a chicken/egg problm here where you need to define this hook before you have a valid realm, so you have to capture the realm by reference, but the hook may get called after the realm is destroyed in some cases. This caused a problem with ASAN. So I had this pass the sync session as a weak_ptr here as the easiest memory-correct way to access the sync session.


explicit SyncConfig(std::shared_ptr<SyncUser> user, bson::Bson partition);
explicit SyncConfig(std::shared_ptr<SyncUser> user, std::string partition);
explicit SyncConfig(std::shared_ptr<SyncUser> user, const char* partition);
Expand Down
29 changes: 14 additions & 15 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ void Connection::read_or_write_error(std::error_code ec)
{
m_reconnect_info.m_reason = ConnectionTerminationReason::read_or_write_error;
bool is_fatal = false;
close_due_to_client_side_error(ec, is_fatal); // Throws
close_due_to_client_side_error(ec, is_fatal); // Throws
}


Expand Down Expand Up @@ -1111,7 +1111,7 @@ void Connection::disconnect(const SessionErrorInfo& info)
m_sending = false;

report_connection_state_change(ConnectionState::disconnected, info); // Throws
initiate_reconnect_wait(); // Throws
initiate_reconnect_wait(); // Throws
}

bool Connection::is_flx_sync_connection() const noexcept
Expand Down Expand Up @@ -1637,8 +1637,8 @@ void Session::send_bind_message()
// Discard the token since it's ignored by the server.
std::string empty_access_token{};
protocol.make_bind_message(protocol_version, out, session_ident, path, empty_access_token, need_client_file_ident,
is_subserver); // Throws
m_conn.initiate_write_message(out, this); // Throws
is_subserver); // Throws
m_conn.initiate_write_message(out, this); // Throws

m_bind_message_sent = true;

Expand Down Expand Up @@ -1683,7 +1683,7 @@ void Session::send_ident_message()
m_progress.latest_server_version.salt); // Throws
protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
}
m_conn.initiate_write_message(out, this); // Throws
m_conn.initiate_write_message(out, this); // Throws

m_ident_message_sent = true;

Expand Down Expand Up @@ -2063,6 +2063,8 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f
}
}

receive_download_message_hook();

if (batch_state == DownloadBatchState::LastInBatch) {
update_progress(progress); // Throws
}
Expand Down Expand Up @@ -2146,31 +2148,30 @@ std::error_code Session::receive_query_error_message(int error_code, std::string

// The caller (Connection) must discard the session if the session has become
// deactivated upon return.
std::error_code Session::receive_error_message(int error_code, const ProtocolErrorInfo& info)
std::error_code Session::receive_error_message(int error_code_int, const ProtocolErrorInfo& info)
{
logger.info("Received: ERROR \"%1\" (error_code=%2, try_again=%3, recovery_disabled=%4)", info.message,
error_code, info.try_again, info.client_reset_recovery_is_disabled); // Throws
error_code_int, info.try_again, info.client_reset_recovery_is_disabled); // Throws

bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
if (REALM_UNLIKELY(!legal_at_this_time)) {
logger.error("Illegal message at this time");
return ClientError::bad_message_order;
}

bool known_error_code = bool(get_protocol_error_message(error_code));
bool known_error_code = bool(get_protocol_error_message(error_code_int));
if (REALM_UNLIKELY(!known_error_code)) {
logger.error("Unknown error code"); // Throws
return ClientError::bad_error_code;
}
ProtocolError error_code_2 = ProtocolError(error_code);
if (REALM_UNLIKELY(!is_session_level_error(error_code_2))) {
ProtocolError error_code = ProtocolError(error_code_int);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like when we have variables that end in _2. This variable renaming is just to try to make the variable names match their purpose.

if (REALM_UNLIKELY(!is_session_level_error(error_code))) {
logger.error("Not a session level error code"); // Throws
return ClientError::bad_error_code;
}

REALM_ASSERT(!m_suspended);
REALM_ASSERT(m_state == Active || m_state == Deactivating);

logger.debug("Suspended"); // Throws

m_error_message_received = true;
Expand All @@ -2194,10 +2195,8 @@ std::error_code Session::receive_error_message(int error_code, const ProtocolErr
// Notify the application of the suspension of the session if the session is
// still in the Active state
if (m_state == Active) {
m_conn.one_less_active_unsuspended_session(); // Throws
std::error_code ec = make_error_code(error_code_2);
SessionErrorInfo error_info(info, ec);
on_suspended(error_info); // Throws
m_conn.one_less_active_unsuspended_session(); // Throws
on_suspended({info, make_error_code(error_code)}); // Throws
}

if (info.try_again) {
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,7 @@ class ClientImpl::Session {
bool check_received_sync_progress(const SyncProgress&, int&) noexcept;
void check_for_upload_completion();
void check_for_download_completion();
void receive_download_message_hook();

friend class Connection;
};
Expand Down
4 changes: 4 additions & 0 deletions src/realm/sync/subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,11 @@ SubscriptionStore::get_next_pending_version(int64_t last_query_version, DB::vers
descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}});
auto res = sub_sets->where()
.greater(sub_sets->get_primary_key_column(), last_query_version)
.group()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the crux of the whole bug. When you're first firing up the sync client, the last_query_version will be zero which should include all the non-complete/non-error subscription sets, but didn't. This meant the sync client would never follow up the IDENT with a QUERY message and never make progress.

.equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending))
.Or()
.equal(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Bootstrapping))
.end_group()
.greater_equal(m_sub_set_snapshot_version, static_cast<int64_t>(after_client_version))
.find_all(descriptor_ordering);

Expand Down
104 changes: 103 additions & 1 deletion test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include "flx_sync_harness.hpp"
#include "util/test_file.hpp"
#include "realm/object-store/impl/object_accessor_impl.hpp"
#include "realm/sync/config.hpp"
#include "realm/sync/protocol.hpp"
#include "realm/sync/noinst/client_history_impl.hpp"
#include <realm/sync/noinst/server/access_token.hpp>

namespace realm::app {
Expand All @@ -35,7 +37,16 @@ const Schema g_minimal_schema{
{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
}},
};
}

const Schema g_large_array_schema{
ObjectSchema("TopLevel",
{
{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
{"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
{"list_of_strings", PropertyType::Array | PropertyType::String},
}),
};
} // namespace

TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][app]") {
FLXSyncTestHarness harness("basic_flx_connect");
Expand Down Expand Up @@ -255,6 +266,97 @@ TEST_CASE("flx: query on non-queryable field results in query error message", "[
});
}

TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][flx][app]") {
FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});

// First we need to seed the server with objects that are large and complex enough that they get broken
// into multiple download messages.
//
// The server will break up changesets and download messages when they contain more than 1000 instructions
// and are bigger than 1MB respectively.
//
// So this generates 5 objects each with 1000+ instructions that are each 1MB+ big. This should result in
// 3 download messages total with one changeset each for the bootstrap download messages.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's 5 download messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are two download messages for subscription set zero (the schema instructions) and then 5 total for the entire sync session. The trace output for after the session resumes is below:

Connection[3]: Session[3]: Sending: IDENT(client_file_ident=4, client_file_ident_salt=8478468785466499577, scan_server_version=5, scan_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, query_version: 0 query_size: 2, query: "{}")
Connection[3]: Session[3]: Sending: MARK(request_ident=1)
Connection[3]: Session[3]: Received: MARK(request_ident=1)
Connection[3]: Session[3]: Sending: QUERY(query_version=1, query_size=30, query="{"TopLevel":"(TRUEPREDICATE)"}"
Connection[3]: Session[3]: Sending: UPLOAD(progress_client_version=15, progress_server_version=5, locked_server_version=5, num_changesets=0)
Connection[3]: Download message compression: is_body_compressed = true, compressed_body_size=17203, uncompressed_body_size=2146234
Connection[3]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232116755366, origin_file_ident=1, original_changeset_size=2146199, changeset_size=2146199)
Connection[3]: Changeset (parsed):
InternStrings   0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject     path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6736}]
CreateObject    path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6736}]
Update          path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6736}].queryable_int_field, value=Int(0), default=0
EraseObject     path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6737}]
CreateObject    path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6737}]
Update          path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6737}].queryable_int_field, value=Int(5), default=0

Connection[3]: Session[3]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, upload_client_version=11, upload_server_version=5, downloadable_bytes=0, last_in_batch=false, query_version=1, num_changesets=1, ...)
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6736);
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6737);
Connection[3]: Session[3]: 1 remote changeset integrated, producing client version 16
Connection[3]: Download message compression: is_body_compressed = true, compressed_body_size=17205, uncompressed_body_size=2146234
Connection[3]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232116755366, origin_file_ident=1, original_changeset_size=2146199, changeset_size=2146199)
Connection[3]: Changeset (parsed):
InternStrings   0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject     path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6738}]
CreateObject    path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6738}]
Update          path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6738}].queryable_int_field, value=Int(10), default=0
EraseObject     path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6739}]
CreateObject    path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6739}]
Update          path=TopLevel[ObjectId{627a6011c0fcbd05b4fa6739}].queryable_int_field, value=Int(15), default=0

Connection[3]: Session[3]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, upload_client_version=11, upload_server_version=5, downloadable_bytes=0, last_in_batch=false, query_version=1, num_changesets=1, ...)
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6738);
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa6739);
Connection[3]: Session[3]: 1 remote changeset integrated, producing client version 18
Connection[3]: Download message compression: is_body_compressed = true, compressed_body_size=8713, uncompressed_body_size=1073160
Connection[3]: Received: DOWNLOAD CHANGESET(server_version=5, client_version=5, origin_timestamp=232116755366, origin_file_ident=1, original_changeset_size=1073125, changeset_size=1073125)
Connection[3]: Changeset (parsed):
InternStrings   0="TopLevel", 1="list_of_strings", 2="queryable_int_field"
EraseObject     path=TopLevel[ObjectId{627a6011c0fcbd05b4fa673a}]
CreateObject    path=TopLevel[ObjectId{627a6011c0fcbd05b4fa673a}]
Update          path=TopLevel[ObjectId{627a6011c0fcbd05b4fa673a}].queryable_int_field, value=Int(20), default=0

Connection[3]: Session[3]: Received: DOWNLOAD(download_server_version=5, download_client_version=5, latest_server_version=5, latest_server_version_salt=2271452601256551051, upload_client_version=11, upload_server_version=5, downloadable_bytes=0, last_in_batch=true, query_version=1, num_changesets=1, ...)
Connection[3]: Session[3]: sync::create_object_with_primary_key(group, get_table("class_TopLevel"), 627a6011c0fcbd05b4fa673a);
Connection[3]: Session[3]: 1 remote changeset integrated, producing client version 19

std::vector<ObjectId> obj_ids_at_end;
harness.load_initial_data([&](SharedRealm realm) {
CppContext c(realm);
for (int i = 0; i < 5; ++i) {
auto id = ObjectId::gen();
auto obj = Object::create(c, realm, "TopLevel",
util::Any(AnyDict{{"_id", id},
{"list_of_strings", AnyVector{}},
{"queryable_int_field", static_cast<int64_t>(i * 5)}}));
List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings"));
for (int j = 0; j < 1024; ++j) {
str_list.add(c, util::Any(std::string(1024, 'a' + (j % 26))));
}

obj_ids_at_end.push_back(id);
}
});
SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
SyncConfig::FLXSyncEnabled{});
interrupted_realm_config.cache = false;

{
SharedRealm realm;
auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
Realm::Config config = interrupted_realm_config;
config.sync_config->on_download_message_received_hook =
[download_msg_counter = int(0),
promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise))](
std::weak_ptr<SyncSession> weak_session) mutable {
auto session = weak_session.lock();
// We interrupt on the 5th download message, which should be 2/3rd of the way through the
// bootstrap. The first two download messages are for exchanging schema instructions and then
// two messages of actual data.
if (!session || ++download_msg_counter != 5) {
return;
}

session->close();
promise->emplace_value();
};

realm = Realm::get_shared_realm(config);
{
auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
auto table = realm->read_group().get_table("class_TopLevel");
mut_subs.insert_or_assign(Query(table));
std::move(mut_subs).commit();
}

interrupted.get();
}

{
auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path);
auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
REQUIRE(sub_store->get_active_and_latest_versions() == std::pair<int64_t, int64_t>{0, 1});
auto latest_subs = sub_store->get_latest();
REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is checking whether we've actually set this test up correctly and that there is a SubscriptionSet that has started bootstrapping but hasn't completed yet.

REQUIRE(latest_subs.size() == 1);
REQUIRE(latest_subs.at(0).object_class_name() == "TopLevel");
}

auto realm = Realm::get_shared_realm(interrupted_realm_config);
auto table = realm->read_group().get_table("class_TopLevel");
realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the changes in subscriptions.cpp, this test would hang forever here.

wait_for_upload(*realm);
wait_for_download(*realm);

realm->refresh();
REQUIRE(table->size() == obj_ids_at_end.size());
for (auto& id : obj_ids_at_end) {
REQUIRE(table->find_primary_key(Mixed{id}));
}

auto active_subs = realm->get_active_subscription_set();
auto latest_subs = realm->get_latest_subscription_set();
REQUIRE(active_subs.version() == latest_subs.version());
REQUIRE(active_subs.version() == int64_t(1));
}

TEST_CASE("flx: dev mode uploads schema before query change", "[sync][flx][app]") {
FLXSyncTestHarness::ServerSchema server_schema;
auto default_schema = FLXSyncTestHarness::default_server_schema();
Expand Down
40 changes: 39 additions & 1 deletion test/test_sync_subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ TEST(Sync_SubscriptionStoreNotifications)
CHECK_EQUAL(std::move(fut).get(), SubscriptionSet::State::Complete);
}

TEST(Sync_RefreshSubscriptionSetInvalidSubscriptionStore)
TEST(Sync_SubscriptionStoreRefreshSubscriptionSetInvalid)
{
SHARED_GROUP_TEST_PATH(sub_store_path)
SubscriptionStoreFixture fixture(sub_store_path);
Expand Down Expand Up @@ -451,4 +451,42 @@ TEST(Sync_SubscriptionStoreInternalSchemaMigration)
CHECK(!versions.get_version_for(tr, "non_existent_table"));
}

TEST(Sync_SubscriptionStoreNextPendingVersion)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a unit test for the changes in subscriptions.cpp.

{
SHARED_GROUP_TEST_PATH(sub_store_path)
SubscriptionStoreFixture fixture(sub_store_path);
auto store = SubscriptionStore::create(fixture.db, [](int64_t) {});

auto mut_sub_set = store->get_latest().make_mutable_copy();
auto sub_set = std::move(mut_sub_set).commit();
auto complete_set = sub_set.version();

mut_sub_set = sub_set.make_mutable_copy();
sub_set = std::move(mut_sub_set).commit();
auto bootstrapping_set = sub_set.version();

mut_sub_set = sub_set.make_mutable_copy();
sub_set = std::move(mut_sub_set).commit();
auto pending_set = sub_set.version();

mut_sub_set = store->get_mutable_by_version(complete_set);
mut_sub_set.update_state(SubscriptionSet::State::Complete);
std::move(mut_sub_set).commit();

mut_sub_set = store->get_mutable_by_version(bootstrapping_set);
mut_sub_set.update_state(SubscriptionSet::State::Bootstrapping);
std::move(mut_sub_set).commit();

auto pending_version = store->get_next_pending_version(0, DB::version_type{});
CHECK(pending_version);
CHECK_EQUAL(pending_version->query_version, bootstrapping_set);

pending_version = store->get_next_pending_version(bootstrapping_set, DB::version_type{});
CHECK(pending_set);
CHECK_EQUAL(pending_version->query_version, pending_set);

pending_version = store->get_next_pending_version(pending_set, DB::version_type{});
CHECK(!pending_version);
}

} // namespace realm::sync