Skip to content

Commit

Permalink
Support some converting FLX realms to bundled/local realms (#6076)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbreams authored Dec 15, 2022
1 parent 9e95ff2 commit 10f3ee0
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* Upgrade OpenSSL from 1.1.1n to 3.0.7. ([#6097](https://github.com/realm/realm-core/pull/6097))
* Converting flexible sync realms to bundled and local realms is now supported ([#6076](https://github.com/realm/realm-core/pull/6076))
* Compensating write errors are now surfaced to the SDK/user after the compensating write has been applied in a download message ([#6095](https://github.com/realm/realm-core/pull/6095)).

### Fixed
Expand Down
36 changes: 24 additions & 12 deletions src/realm/object-store/shared_realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,22 +194,24 @@ std::shared_ptr<SyncSession> Realm::sync_session() const

sync::SubscriptionSet Realm::get_latest_subscription_set()
{
// If there is a subscription store, then return the latest set
if (auto flx_sub_store = m_coordinator->sync_session()->get_flx_subscription_store()) {
return flx_sub_store->get_latest();
if (!m_config.sync_config || !m_config.sync_config->flx_sync_requested) {
throw std::runtime_error("Flexible sync is not enabled");
}
// Otherwise, throw runtime_error
throw std::runtime_error("Flexible sync is not enabled");
// If there is a subscription store, then return the active set
auto flx_sub_store = m_coordinator->sync_session()->get_flx_subscription_store();
REALM_ASSERT(flx_sub_store);
return flx_sub_store->get_latest();
}

sync::SubscriptionSet Realm::get_active_subscription_set()
{
// If there is a subscription store, then return the active set
if (auto flx_sub_store = m_coordinator->sync_session()->get_flx_subscription_store()) {
return flx_sub_store->get_active();
if (!m_config.sync_config || !m_config.sync_config->flx_sync_requested) {
throw std::runtime_error("Flexible sync is not enabled");
}
// Otherwise, throw runtime_error
throw std::runtime_error("Flexible sync is not enabled");
// If there is a subscription store, then return the active set
auto flx_sub_store = m_coordinator->sync_session()->get_flx_subscription_store();
REALM_ASSERT(flx_sub_store);
return flx_sub_store->get_active();
}
#endif

Expand Down Expand Up @@ -1091,9 +1093,19 @@ void Realm::convert(const Config& config, bool merge_into_existing)
verify_thread();

#if REALM_ENABLE_SYNC
if (config.sync_config && config.sync_config->flx_sync_requested) {
throw std::logic_error("Realm cannot be converted if flexible sync is enabled");
auto src_is_flx_sync = m_config.sync_config && m_config.sync_config->flx_sync_requested;
auto dst_is_flx_sync = config.sync_config && config.sync_config->flx_sync_requested;
auto dst_is_pbs_sync = config.sync_config && !config.sync_config->flx_sync_requested;

if (dst_is_flx_sync && !src_is_flx_sync) {
throw std::logic_error(
"Realm cannot be converted to a flexible sync realm unless flexible sync is already enabled");
}
if (dst_is_pbs_sync && src_is_flx_sync) {
throw std::logic_error(
"Realm cannot be converted from a flexible sync realm to a partition based sync realm");
}

#endif

if (merge_into_existing && util::File::exists(config.path)) {
Expand Down
6 changes: 0 additions & 6 deletions test/object-store/realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1125,12 +1125,6 @@ TEST_CASE("SharedRealm: convert") {
REQUIRE(sync_realm->read_group().get_table("class_object")->size() == 1);
}

SECTION("cannot convert from local realm to flx sync") {
SyncTestFile sync_config(tsm.app()->current_user(), schema, SyncConfig::FLXSyncEnabled{});
auto local_realm = Realm::get_shared_realm(local_config1);
REQUIRE_THROWS(local_realm->convert(sync_config));
}

SECTION("can copy a local realm to a local realm") {
auto local_realm1 = Realm::get_shared_realm(local_config1);
local_realm1->begin_transaction();
Expand Down
159 changes: 159 additions & 0 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2479,6 +2479,165 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][app]") {
}
}

TEST_CASE("flx: convert flx sync realm to bundled realm", "[app][flx][sync]") {
static auto foo_obj_id = ObjectId::gen();
static auto bar_obj_id = ObjectId::gen();
static auto bizz_obj_id = ObjectId::gen();
static std::optional<FLXSyncTestHarness> harness;
if (!harness) {
harness.emplace("bundled_flx_realms");
harness->load_initial_data([&](SharedRealm realm) {
CppContext c(realm);
Object::create(c, realm, "TopLevel",
std::any(AnyDict{{"_id", foo_obj_id},
{"queryable_str_field", "foo"s},
{"queryable_int_field", static_cast<int64_t>(5)},
{"non_queryable_field", "non queryable 1"s}}));
Object::create(c, realm, "TopLevel",
std::any(AnyDict{{"_id", bar_obj_id},
{"queryable_str_field", "bar"s},
{"queryable_int_field", static_cast<int64_t>(10)},
{"non_queryable_field", "non queryable 2"s}}));
});
}

SECTION("flx to flx (should succeed)") {
create_user_and_log_in(harness->app());
SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
harness->do_with_new_realm([&](SharedRealm realm) {
auto table = realm->read_group().get_table("class_TopLevel");
auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
auto subs = std::move(mut_subs).commit();

subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
wait_for_advance(*realm);

realm->convert(target_config);
});

auto target_realm = Realm::get_shared_realm(target_config);

target_realm->begin_transaction();
CppContext c(target_realm);
Object::create(c, target_realm, "TopLevel",
std::any(AnyDict{{"_id", bizz_obj_id},
{"queryable_str_field", "bizz"s},
{"queryable_int_field", static_cast<int64_t>(15)},
{"non_queryable_field", "non queryable 3"s}}));
target_realm->commit_transaction();

wait_for_upload(*target_realm);
wait_for_download(*target_realm);

auto latest_subs = target_realm->get_active_subscription_set();
auto table = target_realm->read_group().get_table("class_TopLevel");
REQUIRE(latest_subs.size() == 1);
REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
REQUIRE(latest_subs.at(0).query_string ==
Query(table).greater(table->get_column_key("queryable_int_field"), 5).get_description());

REQUIRE(table->size() == 2);
REQUIRE(table->find_primary_key(bar_obj_id));
REQUIRE(table->find_primary_key(bizz_obj_id));
REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
}

SECTION("flx to local (should succeed)") {
TestFile target_config;

harness->do_with_new_realm([&](SharedRealm realm) {
auto table = realm->read_group().get_table("class_TopLevel");
auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
auto subs = std::move(mut_subs).commit();

subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
wait_for_advance(*realm);

target_config.schema = realm->schema();
target_config.schema_version = realm->schema_version();
realm->convert(target_config);
});

auto target_realm = Realm::get_shared_realm(target_config);
REQUIRE_THROWS(target_realm->get_active_subscription_set());

auto table = target_realm->read_group().get_table("class_TopLevel");
REQUIRE(table->size() == 2);
REQUIRE(table->find_primary_key(bar_obj_id));
REQUIRE(table->find_primary_key(bizz_obj_id));
REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
}

SECTION("flx to pbs (should fail to convert)") {
create_user_and_log_in(harness->app());
SyncTestFile target_config(harness->app()->current_user(), "12345"s, harness->schema());
harness->do_with_new_realm([&](SharedRealm realm) {
auto table = realm->read_group().get_table("class_TopLevel");
auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
auto subs = std::move(mut_subs).commit();

subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
wait_for_advance(*realm);

REQUIRE_THROWS(realm->convert(target_config));
});
}

SECTION("pbs to flx (should fail to convert)") {
create_user_and_log_in(harness->app());
SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});

auto pbs_app_config = minimal_app_config(harness->app()->base_url(), "pbs_to_flx_convert", harness->schema());

TestAppSession pbs_app_session(create_app(pbs_app_config));
SyncTestFile source_config(pbs_app_session.app()->current_user(), "54321"s, pbs_app_config.schema);
auto realm = Realm::get_shared_realm(source_config);

realm->begin_transaction();
CppContext c(realm);
Object::create(c, realm, "TopLevel",
std::any(AnyDict{{"_id", foo_obj_id},
{"queryable_str_field", "foo"s},
{"queryable_int_field", static_cast<int64_t>(5)},
{"non_queryable_field", "non queryable 1"s}}));
realm->commit_transaction();

REQUIRE_THROWS(realm->convert(target_config));
}

SECTION("local to flx (should fail to convert)") {
TestFile source_config;
source_config.schema = harness->schema();
source_config.schema_version = 1;

auto realm = Realm::get_shared_realm(source_config);
auto foo_obj_id = ObjectId::gen();

realm->begin_transaction();
CppContext c(realm);
Object::create(c, realm, "TopLevel",
std::any(AnyDict{{"_id", foo_obj_id},
{"queryable_str_field", "foo"s},
{"queryable_int_field", static_cast<int64_t>(5)},
{"non_queryable_field", "non queryable 1"s}}));
realm->commit_transaction();

create_user_and_log_in(harness->app());
SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});

REQUIRE_THROWS(realm->convert(target_config));
}

// Add new sections before this
SECTION("teardown") {
harness->app()->sync_manager()->wait_for_sessions_to_terminate();
harness.reset();
}
}

TEST_CASE("flx: compensating write errors get re-sent across sessions", "[sync][flx][app]") {
AppCreateConfig::FLXSyncRole role;
role.name = "compensating_write_perms";
Expand Down
12 changes: 7 additions & 5 deletions test/object-store/util/baas_admin_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ class BaasRuleBuilder {
public:
using IncludePropCond = util::UniqueFunction<bool(const Property&)>;
BaasRuleBuilder(const Schema& schema, const Property& partition_key, const std::string& service_name,
const std::string& db_name)
const std::string& db_name, bool is_flx_sync)
: m_schema(schema)
, m_partition_key(partition_key)
, m_mongo_service_name(service_name)
, m_mongo_db_name(db_name)
, m_is_flx_sync(is_flx_sync)
{
}

Expand All @@ -91,6 +92,7 @@ class BaasRuleBuilder {
const Property& m_partition_key;
const std::string& m_mongo_service_name;
const std::string& m_mongo_db_name;
const bool m_is_flx_sync;
nlohmann::json m_relationships;
std::vector<std::string> m_current_path;
};
Expand All @@ -99,7 +101,7 @@ nlohmann::json BaasRuleBuilder::object_schema_to_jsonschema(const ObjectSchema&
const IncludePropCond& include_prop, bool clear_path)
{
nlohmann::json required = nlohmann::json::array();
nlohmann::json properties;
nlohmann::json properties = nlohmann::json::object();
for (const auto& prop : obj_schema.persisted_properties) {
if (include_prop && !include_prop(prop)) {
continue;
Expand Down Expand Up @@ -181,13 +183,12 @@ nlohmann::json BaasRuleBuilder::object_schema_to_baas_schema(const ObjectSchema&

auto schema_json = object_schema_to_jsonschema(obj_schema, include_prop, true);
auto& prop_sub_obj = schema_json["properties"];
if (!prop_sub_obj.contains(m_partition_key.name)) {
if (!prop_sub_obj.contains(m_partition_key.name) && !m_is_flx_sync) {
prop_sub_obj.emplace(m_partition_key.name, property_to_jsonschema(m_partition_key, include_prop));
if (!is_nullable(m_partition_key.type)) {
schema_json["required"].push_back(m_partition_key.name);
}
}
std::string test = schema_json.dump();
return {
{"schema", schema_json},
{"metadata", nlohmann::json::object({{"database", m_mongo_db_name},
Expand Down Expand Up @@ -1034,7 +1035,8 @@ AppSession create_app(const AppCreateConfig& config)
// partition key, then add the rest of the properties. This ensures that the
// targest of links exist before adding the links.
std::vector<std::pair<std::string, const ObjectSchema*>> object_schema_to_create;
BaasRuleBuilder rule_builder(config.schema, config.partition_key, mongo_service_name, config.mongo_dbname);
BaasRuleBuilder rule_builder(config.schema, config.partition_key, mongo_service_name, config.mongo_dbname,
static_cast<bool>(config.flx_sync_config));
for (const auto& obj_schema : config.schema) {
auto schema_to_create = rule_builder.object_schema_to_baas_schema(obj_schema, pk_and_queryable_only);
auto schema_create_resp = schemas.post_json(schema_to_create);
Expand Down

0 comments on commit 10f3ee0

Please sign in to comment.