diff --git a/CMakeLists.txt b/CMakeLists.txt index c1ccd4e03..310adfae8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,11 +30,6 @@ if(REALM_CORE_PREFIX AND REALM_ENABLE_SYNC AND NOT REALM_SYNC_PREFIX) message(FATAL_ERROR "REALM_SYNC_PREFIX must be set when specifying REALM_CORE_PREFIX when REALM_ENABLE_SYNC is set.") endif() -set(REALM_ENABLE_SERVER OFF CACHE BOOL "Enable the server-only functionality.") -if(REALM_ENABLE_SERVER AND NOT REALM_ENABLE_SYNC) - message(FATAL_ERROR "REALM_ENABLE_SERVER requires REALM_ENABLE_SYNC.") -endif() - include(RealmCore) use_realm_core("${REALM_ENABLE_SYNC}" "${REALM_CORE_PREFIX}" "${REALM_SYNC_PREFIX}") diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f9b915632..a1a0d2554 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -118,18 +118,6 @@ if(REALM_ENABLE_SYNC) list(APPEND INCLUDE_DIRS ${ZLIB_INCLUDE_DIRS}) endif() -if(REALM_ENABLE_SERVER) - list(APPEND HEADERS - server/adapter.hpp - server/admin_realm.hpp - server/global_notifier.hpp) - list(APPEND SOURCES - server/adapter.cpp - server/admin_realm.cpp - server/global_notifier.cpp) - list(APPEND INCLUDE_DIRS ../external/json) -endif() - add_library(realm-object-store STATIC ${SOURCES} ${HEADERS}) set_target_properties(realm-object-store PROPERTIES POSITION_INDEPENDENT_CODE 1) target_compile_definitions(realm-object-store PRIVATE ${PLATFORM_DEFINES}) diff --git a/src/server/adapter.cpp b/src/server/adapter.cpp deleted file mode 100644 index 4d9e8e964..000000000 --- a/src/server/adapter.cpp +++ /dev/null @@ -1,687 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#include "adapter.hpp" - -#include "admin_realm.hpp" -#include "impl/realm_coordinator.hpp" -#include "object_schema.hpp" -#include "object_store.hpp" - -#include <realm/sync/changeset_cooker.hpp> -#include <realm/sync/changeset_parser.hpp> -#include <realm/impl/transact_log.hpp> -#include <realm/impl/input_stream.hpp> -#include <realm/util/base64.hpp> -#include <realm/global_key.hpp> - -#include <json.hpp> -#include <unordered_map> -#include <unordered_set> - -namespace { -// This needs to appear before `using namespace realm`. After that, -// realm::Schema's operator== becomes a candidate function to call for the -// comparison of array values within json. This occurs in a SFINAE context -// and so should simply be discarded as a candidate, but VC++ 2017 incorrectly -// considers it a hard error. -bool equals(nlohmann::json const& a, nlohmann::json const& b) { - return a == b; -} -} // anonymous namespace - -using namespace realm; - -using Instruction = sync::Instruction; - -namespace { -static PropertyType from_core_type(DataType type) -{ - switch (type) { - case type_Int: return PropertyType::Int; - case type_Float: return PropertyType::Float; - case type_Double: return PropertyType::Double; - case type_Bool: return PropertyType::Bool; - case type_String: return PropertyType::String; - case type_Binary: return PropertyType::Data; - case type_Timestamp: return PropertyType::Date; - case type_OldMixed: return PropertyType::Any; - case type_ObjectId: return PropertyType::ObjectId; - case type_Decimal: return PropertyType::Decimal; - case type_Link: return PropertyType::Object | PropertyType::Nullable; - case type_LinkList: return PropertyType::Object | PropertyType::Array; - case type_OldTable: REALM_ASSERT(false && "Use ObjectSchema::from_core_type if subtables are a possibility"); - default: REALM_UNREACHABLE(); - } -} - -class ChangesetCookerInstructionHandler final : public sync::InstructionHandler { -public: - friend Adapter; - - ChangesetCookerInstructionHandler(const Group &group, util::Logger& logger, util::AppendBuffer<char>& out_buffer) - : m_group(group) - , m_table_info(static_cast<Transaction&>(const_cast<Group&>(m_group))) - , m_logger(logger) - , m_out_buffer(out_buffer) - { - } - - const Group &m_group; - sync::TableInfoCache m_table_info; - util::Logger& m_logger; - util::AppendBuffer<char>& m_out_buffer; - std::unordered_map<std::string, ObjectSchema> m_schema; - - std::unordered_map<std::string, std::unordered_map<GlobalKey, int64_t>> m_int_primaries; - std::unordered_map<std::string, std::unordered_map<GlobalKey, std::string>> m_string_primaries; - std::unordered_map<std::string, std::unordered_set<GlobalKey>> m_null_primaries; - - nlohmann::json m_pending_instruction = nullptr; - - std::string m_selected_object_type; - ConstTableRef m_selected_table; - ObjectSchema *m_selected_object_schema = nullptr; - Property *m_selected_primary = nullptr; - - std::string m_list_property_name; - nlohmann::json m_list_parent_identity; - - ConstTableRef m_list_target_table; - ObjectSchema *m_list_target_object_schema = nullptr; - Property *m_list_target_primary = nullptr; - - void flush() { - if (m_pending_instruction.is_null()) - return; - if (m_out_buffer.size()) - m_out_buffer.append(",", 1); - else - m_out_buffer.append("[", 1); - auto str = m_pending_instruction.dump(); - m_out_buffer.append(str.data(), str.size()); - m_pending_instruction = nullptr; - } - - bool finish() { - flush(); - if (!m_out_buffer.size()) - return false; - m_out_buffer.append("]", 1); - return true; - } - - void add_instruction(Adapter::InstructionType type, - nlohmann::json &&inst = {}, bool collapsible = false, - util::Optional<std::string> object_type = util::none) { - if (!object_type && !m_selected_object_schema) { - return; // FIXME: support objects without schemas - } - flush(); - - inst["type"] = Adapter::instruction_type_string(type); - inst["object_type"] = object_type ? *object_type : m_selected_object_schema->name; - m_pending_instruction = std::move(inst); - if (!collapsible) - flush(); - } - - void add_set_instruction(GlobalKey row, StringData column, nlohmann::json &&value) { - nlohmann::json identity = get_identity(row, *m_selected_table, m_selected_primary); - - // collapse values if inserting/setting values for the last object - if (!m_pending_instruction.is_null()) { - nlohmann::json &last = m_pending_instruction; - if (equals(identity, last["identity"]) && m_selected_object_schema && m_selected_object_schema->name == last["object_type"].get<std::string>()) { - last["values"][column] = value; - return; - } - } - - // if not collapsed create new - add_instruction(Adapter::InstructionType::Set, { - {"identity", std::move(identity)}, - {"values", {{column, value}}} - }, true); - } - - void add_column_instruction(std::string object_type, std::string prop_name, nlohmann::json &&prop) { - if (!m_pending_instruction.is_null()) { - nlohmann::json &last = m_pending_instruction; - if (last["object_type"].get<std::string>() == object_type && ( - last["type"].get<std::string>() == "ADD_TYPE" || - last["type"].get<std::string>() == "ADD_PROPERTIES")) - { - last["properties"][prop_name] = prop; - return; - } - } - - add_instruction(Adapter::InstructionType::AddProperties, {{"properties", - {{prop_name, prop}} - }}, true, object_type); - } - - nlohmann::json get_identity(GlobalKey object_id, const Table& table, Property *primary_key) { - if (primary_key) { - std::string object_type = ObjectStore::object_type_for_table_name(table.get_name()); - - if (is_nullable(primary_key->type)) { - auto& null_primaries = m_null_primaries[object_type]; - auto it = null_primaries.find(object_id); - if (it != null_primaries.end()) - return nullptr; - } - - if (primary_key->type == PropertyType::Int) { - auto& int_primaries = m_int_primaries[object_type]; - auto it = int_primaries.find(object_id); - if (it != int_primaries.end()) { - return it->second; - } - - auto obj_key = table.get_obj_key(object_id); - auto obj = table.get_object(obj_key); - REALM_ASSERT(obj); - if (is_nullable(primary_key->type) && obj.is_null(primary_key->column_key)) { - return nullptr; - } - return obj.get<int64_t>(primary_key->column_key); - } - else if (primary_key->type == PropertyType::String) { - auto& string_primaries = m_string_primaries[object_type]; - auto it = string_primaries.find(object_id); - if (it != string_primaries.end()) { - return it->second; - } - - auto obj_key = table.get_obj_key(object_id); - auto obj = table.get_object(obj_key); - REALM_ASSERT(obj); - StringData value = obj.get<StringData>(primary_key->column_key); - if (value.is_null()) - return nullptr; - return std::string(value); - } - } - - return object_id.to_string(); - } - - void select(std::string const& object_type, ObjectSchema *&out_object_schema, ConstTableRef &out_table, Property *&out_primary) { - out_object_schema = nullptr; - out_primary = nullptr; - out_table = ConstTableRef(); - - if (object_type.empty()) { - return; - } - auto it = m_schema.find(object_type); - if (it == m_schema.end()) { - out_table = ObjectStore::table_for_object_type(m_group, object_type); - if (!out_table) { - return; - } - it = m_schema.emplace(std::piecewise_construct, - std::forward_as_tuple(object_type), - std::forward_as_tuple(m_group, object_type, out_table->get_key())).first; - } - - out_object_schema = &it->second; - if (!out_table) - out_table = ObjectStore::table_for_object_type(m_group, object_type); - out_primary = it->second.primary_key_property(); - } - - std::unordered_map<uint32_t, sync::StringBufferRange> m_interned_strings; - util::StringBuffer m_string_buffer; - - StringData get_string(sync::StringBufferRange range) const - { - return StringData{m_string_buffer.data() + range.offset, range.size}; - } - - StringData get_string(sync::InternString intern_string) const - { - auto it = m_interned_strings.find(intern_string.value); - REALM_ASSERT(it != m_interned_strings.end()); - return get_string(it->second); - } - - void set_intern_string(uint32_t index, sync::StringBufferRange range) override - { - m_interned_strings[index] = range; - } - - sync::StringBufferRange add_string_range(StringData data) override - { - size_t offset = m_string_buffer.size(); - m_string_buffer.append(data.data(), data.size()); - return sync::StringBufferRange{uint32_t(offset), uint32_t(data.size())}; - } - - // No selection needed: - void operator()(const Instruction::SelectTable& instr) - { - m_selected_object_type = get_string(instr.table); - select(m_selected_object_type, m_selected_object_schema, m_selected_table, m_selected_primary); - } - - void operator()(const Instruction::SelectField& instr) - { - REALM_ASSERT(m_selected_object_schema); - - m_list_parent_identity = get_identity(instr.object, *m_selected_table, m_selected_primary); - m_list_property_name = get_string(instr.field); - - std::string link_target_table = get_string(instr.link_target_table); - select(link_target_table, m_list_target_object_schema, m_list_target_table, m_list_target_primary); - } - - void operator()(const Instruction::AddTable& instr) - { - std::string object_type = get_string(instr.table); - if (object_type.size()) { - nlohmann::json dict = {{"properties", nullptr}}; - if (instr.has_primary_key) { - dict["primary_key"] = get_string(instr.primary_key_field); - dict["properties"][get_string(instr.primary_key_field)] = { - {"nullable", instr.primary_key_nullable}, - {"type", string_for_property_type(from_core_type(instr.primary_key_type))} - }; - } - add_instruction(Adapter::InstructionType::AddType, std::move(dict), - true, object_type); - } - } - - void operator()(const Instruction::EraseTable&) - { - REALM_ASSERT(0); - } - - // Must have table selected: - void operator()(const Instruction::CreateObject& instr) - { - if (!m_selected_object_schema) { - m_logger.warn("Adapter: Ignoring CreateObject instruction with no object schema"); - return; // FIXME: Support objects without schemas - } - - nlohmann::json identity; - nlohmann::json values; - - if (instr.has_primary_key) { - if (instr.payload.type == type_Int) { - identity = instr.payload.data.integer; - m_int_primaries[m_selected_object_type][instr.object] = instr.payload.data.integer; - } - else if (instr.payload.type == type_String) { - std::string value = get_string(instr.payload.data.str); - identity = value; - m_string_primaries[m_selected_object_type][instr.object] = value; - } - else if (instr.payload.is_null()) { - identity = nullptr; - m_null_primaries[m_selected_object_type].insert(instr.object); - } - else { - REALM_TERMINATE("Non-integer/non-string primary keys not supported by adapter."); - } - - values[m_selected_primary->name] = identity; - } - else { - identity = instr.object.to_string(); // Use the stringified Object ID - } - add_instruction(Adapter::InstructionType::Insert, { - {"identity", std::move(identity)}, - {"values", std::move(values)} - }, true); - } - - void operator()(const Instruction::EraseObject& instr) - { - if (!m_selected_object_schema) { - m_logger.warn("Adapter: Ignoring EraseObject instruction with no object schema"); - return; // FIXME: Support objects without schemas - } - - add_instruction(Adapter::InstructionType::Delete, { - {"identity", get_identity(instr.object, *m_selected_table, m_selected_primary)} - }); - - if (m_selected_primary) { - auto& int_primaries = m_int_primaries[m_selected_object_type]; - auto& string_primaries = m_string_primaries[m_selected_object_type]; - auto& null_primaries = m_null_primaries[m_selected_object_type]; - - // invalidate caches - int_primaries.erase(instr.object); - string_primaries.erase(instr.object); - null_primaries.erase(instr.object); - } - } - - void operator()(const Instruction::Set& instr) - { - if (!m_selected_object_schema) { - m_logger.warn("Adapter: Ignoring Set instruction with no object schema"); - return; // FIXME: Support objects without schemas - } - - StringData field = get_string(instr.field); - - if (instr.payload.is_null()) { - return add_set_instruction(instr.object, field, nullptr); - } - - switch (instr.payload.type) { - case type_Int: - return add_set_instruction(instr.object, field, instr.payload.data.integer); - case type_Bool: - return add_set_instruction(instr.object, field, instr.payload.data.boolean); - case type_Float: - return add_set_instruction(instr.object, field, instr.payload.data.fnum); - case type_Double: - return add_set_instruction(instr.object, field, instr.payload.data.dnum); - case type_String: - return add_set_instruction(instr.object, field, std::string(get_string(instr.payload.data.str))); - case type_Binary: { - StringData data = get_string(instr.payload.data.str); - auto encoded_size = util::base64_encoded_size(data.size()); - std::vector<char> encoded_data(encoded_size + 1, '\0'); - util::base64_encode(data.data(), data.size(), encoded_data.data(), encoded_data.size()); - return add_set_instruction(instr.object, field, {"data64", encoded_data.data()}); - } - case type_Timestamp: { - Timestamp ts = instr.payload.data.timestamp; - int64_t value = ts.get_seconds() * 1000 + ts.get_nanoseconds() / 1000000; - return add_set_instruction(instr.object, field, {"date", value}); - } - case type_ObjectId: - return add_set_instruction(instr.object, field, instr.payload.data.object_id.to_string()); - case type_Decimal: - return add_set_instruction(instr.object, field, instr.payload.data.decimal.to_string()); - case type_Link: { - ObjectSchema *target_object_schema; - ConstTableRef target_table; - Property *target_primary; - std::string table_name = get_string(instr.payload.data.link.target_table); - select(table_name, target_object_schema, target_table, target_primary); - nlohmann::json value = get_identity(instr.payload.data.link.target, *target_table, target_primary); - return add_set_instruction(instr.object, field, std::move(value)); - } - - - case type_OldTable: - case type_OldMixed: - case type_LinkList: - case type_OldDateTime: - REALM_TERMINATE("Unsupported data type."); - } - } - - void operator()(const Instruction::AddInteger&) - { - // FIXME - REALM_TERMINATE("AddInteger not supported by adapter."); - } - - void operator()(const Instruction::InsertSubstring&) - { - // FIXME - REALM_TERMINATE("InsertSubstring not supported by adapter."); - } - - void operator()(const Instruction::EraseSubstring&) - { - // FIXME - REALM_TERMINATE("EraseSubstring not supported by adapter."); - } - - void operator()(const Instruction::ClearTable&) - { - add_instruction(Adapter::InstructionType::Clear); - } - - void operator()(const Instruction::AddColumn& instr) - { - if (m_selected_object_type.size()) { - if (instr.type == type_Link || instr.type == type_LinkList) { - add_column_instruction(m_selected_object_type, get_string(instr.field), { - {"type", (instr.type == type_Link ? "object" : "list")}, - {"object_type", get_string(instr.link_target_table)} - }); - } - else if (instr.type == type_OldTable) { - // FIXME: Arrays of primitives are not yet supported. - } - else { - add_column_instruction(m_selected_object_type, get_string(instr.field), { - {"type", string_for_property_type(from_core_type(instr.type))}, - {"nullable", instr.nullable} - }); - } - } - } - - void operator()(const Instruction::EraseColumn&) - { - REALM_TERMINATE("EraseColumn not supported by adapter."); - } - - // Must have linklist selected: - void operator()(const Instruction::ArraySet& instr) - { - if (!m_list_property_name.size()) { - m_logger.warn("Adapter: Ignoring ArraySet instruction on unknown list property"); - return; // FIXME - } - - // FIXME: Support arrays of primitives - - add_instruction(Adapter::InstructionType::ListSet, { - {"identity", m_list_parent_identity}, - {"property", m_list_property_name}, - {"list_index", instr.ndx}, - {"object_identity", get_identity(instr.payload.data.link.target, *m_list_target_table, m_list_target_primary)} - }); - } - - void operator()(const Instruction::ArrayInsert& instr) - { - if (!m_list_property_name.size()) { - m_logger.warn("Adapter: Ignoring ArrayInsert instruction on unknown list property"); - return; // FIXME - } - - - // FIXME: Support arrays of primitives - - add_instruction(Adapter::InstructionType::ListInsert, { - {"identity", m_list_parent_identity}, - {"property", m_list_property_name}, - {"list_index", instr.ndx}, - {"object_identity", get_identity(instr.payload.data.link.target, *m_list_target_table, m_list_target_primary)} - }); - } - - void operator()(const Instruction::ArrayMove&) - { - if (!m_list_property_name.size()) - return; // FIXME - - REALM_TERMINATE("ArrayMove not supported by adapter."); - } - - void operator()(const Instruction::ArraySwap&) - { - if (!m_list_property_name.size()) - return; // FIXME - - REALM_TERMINATE("ArraySwap not supported by adapter."); - } - - void operator()(const Instruction::ArrayErase& instr) - { - if (!m_list_property_name.size()) - return; // FIXME - - add_instruction(Adapter::InstructionType::ListErase, { - {"identity", m_list_parent_identity}, - {"property", m_list_property_name}, - {"list_index", instr.ndx}, - }); - } - - void operator()(const Instruction::ArrayClear&) - { - if (!m_list_property_name.size()) - return; // FIXME - - add_instruction(Adapter::InstructionType::ListClear, { - {"identity", m_list_parent_identity}, - {"property", m_list_property_name}, - }); - } - - void operator()(const Instruction& instr) final override - { - instr.visit(*this); - } -}; - -class ChangesetCooker final : public sync::ClientReplication::ChangesetCooker { -public: - ChangesetCooker(util::Logger& logger) : m_logger(logger) { } - - bool cook_changeset(const Group& group, const char* changeset, - std::size_t changeset_size, - util::AppendBuffer<char>& out_buffer) override { - _impl::SimpleNoCopyInputStream stream(changeset, changeset_size); - ChangesetCookerInstructionHandler cooker_handler(group, m_logger, out_buffer); - sync::ChangesetParser().parse(stream, cooker_handler); - return cooker_handler.finish(); - } - -private: - util::Logger& m_logger; -}; - -} // anonymous namespace - -class Adapter::Impl final : public AdminRealmListener { -public: - Impl(std::function<void(std::string)> realm_changed, std::function<bool(const std::string&)> should_watch_realm_predicate, - std::string local_root_dir, SyncConfig sync_config_template); - - Realm::Config get_config(StringData virtual_path, util::Optional<Schema> schema) const; - - using AdminRealmListener::start; - -private: - void register_realm(GlobalKey, StringData virtual_path) override; - void unregister_realm(GlobalKey, StringData) override {} - void error(std::exception_ptr) override {} // FIXME - void download_complete() override {} - - const std::string m_server_base_url; - std::shared_ptr<SyncUser> m_user; - std::function<SyncBindSessionHandler> m_bind_callback; - - const std::unique_ptr<util::Logger> m_logger; - const std::shared_ptr<ChangesetCooker> m_transformer; - - const std::function<void(std::string)> m_realm_changed; - const std::function<bool(const std::string&)> m_should_watch_realm_predicate; - - std::vector<std::shared_ptr<_impl::RealmCoordinator>> m_realms; - -}; - -Adapter::Impl::Impl(std::function<void(std::string)> realm_changed, std::function<bool(const std::string&)> should_watch_realm_predicate, - std::string local_root_dir, SyncConfig sync_config_template) -: AdminRealmListener(std::move(local_root_dir), std::move(sync_config_template)) -, m_logger(SyncManager::shared().make_logger()) -, m_transformer(std::make_shared<ChangesetCooker>(*m_logger)) -, m_realm_changed(std::move(realm_changed)) -, m_should_watch_realm_predicate(std::move(should_watch_realm_predicate)) -{ -} - -Realm::Config Adapter::Impl::get_config(StringData virtual_path, util::Optional<Schema> schema) const { - Realm::Config config = AdminRealmListener::get_config(virtual_path); - if (schema) { - config.schema = std::move(schema); - config.schema_version = 0; - } - config.sync_config->transformer = m_transformer; - return config; -} - -void Adapter::Impl::register_realm(GlobalKey, StringData virtual_path) { - std::string path = virtual_path; - if (!m_should_watch_realm_predicate(path)) - return; - - auto coordinator = _impl::RealmCoordinator::get_coordinator(get_config(path, util::none)); - std::weak_ptr<Impl> weak_self = std::static_pointer_cast<Impl>(shared_from_this()); - coordinator->set_transaction_callback([path = std::move(path), weak_self = std::move(weak_self)](VersionID, VersionID) { - if (auto self = weak_self.lock()) - self->m_realm_changed(path); - }); - m_realms.push_back(coordinator); -} - -Adapter::Adapter(std::function<void(std::string)> realm_changed, std::function<bool(const std::string&)> should_watch_realm_predicate, - std::string local_root_dir, SyncConfig sync_config_template) -: m_impl(std::make_shared<Adapter::Impl>(std::move(realm_changed), std::move(should_watch_realm_predicate), - std::move(local_root_dir), std::move(sync_config_template))) -{ - m_impl->start(); -} - -util::Optional<util::AppendBuffer<char>> Adapter::current(std::string realm_path) { - auto history = realm::sync::make_client_replication(get_config(realm_path, util::none).path); - auto db = DB::create(*history); - - auto progress = history->get_cooked_progress(); - if (progress.changeset_index >= history->get_num_cooked_changesets()) { - return util::none; - } - - util::AppendBuffer<char> buffer; - history->get_cooked_changeset(progress.changeset_index, buffer); - return buffer; -} - -void Adapter::advance(std::string realm_path) { - auto history = realm::sync::make_client_replication(get_config(realm_path, util::none).path); - auto db = DB::create(*history); - - auto progress = history->get_cooked_progress(); - if (progress.changeset_index < history->get_num_cooked_changesets()) { - progress.changeset_index++; - history->set_cooked_progress(progress); - } -} - -Realm::Config Adapter::get_config(std::string path, util::Optional<Schema> schema) { - return m_impl->get_config(path, std::move(schema)); -} diff --git a/src/server/adapter.hpp b/src/server/adapter.hpp deleted file mode 100644 index b52c840f3..000000000 --- a/src/server/adapter.hpp +++ /dev/null @@ -1,81 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#ifndef REALM_SYNC_ADAPTER_HPP -#define REALM_SYNC_ADAPTER_HPP - -#include "shared_realm.hpp" -#include "sync/sync_config.hpp" - -#include <regex> - -namespace realm { - -class SyncUser; -class SyncLoggerFactory; - -class Adapter { -public: - Adapter(std::function<void(std::string)> realm_changed, std::function<bool(const std::string&)> should_watch_realm_predicate, - std::string local_root_dir, SyncConfig sync_config_template); - - enum class InstructionType { - Insert, - Delete, - Set, - Clear, - ListSet, - ListInsert, - ListErase, - ListClear, - AddType, - AddProperties, - }; - - static std::string instruction_type_string(InstructionType type) { - switch (type) { - case InstructionType::Insert: return "INSERT"; - case InstructionType::Delete: return "DELETE"; - case InstructionType::Set: return "SET"; - case InstructionType::Clear: return "CLEAR"; - case InstructionType::ListSet: return "LIST_SET"; - case InstructionType::ListInsert: return "LIST_INSERT"; - case InstructionType::ListErase: return "LIST_ERASE"; - case InstructionType::ListClear: return "LIST_CLEAR"; - case InstructionType::AddType: return "ADD_TYPE"; - case InstructionType::AddProperties: return "ADD_PROPERTIES"; - } - REALM_COMPILER_HINT_UNREACHABLE(); - return {}; - } - - util::Optional<util::AppendBuffer<char>> current(std::string realm_path); - void advance(std::string realm_path); - - Realm::Config get_config(std::string path, util::Optional<Schema> schema = util::none); - - void close() { m_impl.reset(); } - -private: - class Impl; - std::shared_ptr<Impl> m_impl; -}; - -} // namespace realm - -#endif // REALM_SYNC_ADAPTER_HPP diff --git a/src/server/admin_realm.cpp b/src/server/admin_realm.cpp deleted file mode 100644 index 4e9f38997..000000000 --- a/src/server/admin_realm.cpp +++ /dev/null @@ -1,188 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#include "admin_realm.hpp" - -#include "object_store.hpp" -#include "results.hpp" -#include "object_schema.hpp" -#include "util/event_loop_dispatcher.hpp" - -#include "sync/sync_config.hpp" -#include "sync/sync_manager.hpp" -#include "sync/sync_user.hpp" -#include "sync/sync_session.hpp" - -#include <realm/util/file.hpp> -#include <realm/util/scope_exit.hpp> -#include <realm/util/uri.hpp> -#include <realm/obj.hpp> - -#include <stdexcept> -#include <vector> - -using namespace realm; -using namespace realm::_impl; - -AdminRealmListener::AdminRealmListener(std::string local_root_dir, SyncConfig sync_config_template) -: m_local_root_dir(std::move(local_root_dir)) -, m_sync_config_template(std::move(sync_config_template)) -{ - m_config.path = util::File::resolve("realms.realm", m_local_root_dir); - // We explicitly set the schema to avoid a race condition where the Admin Realm may - // have been created but its schema may not have been uploaded to the server yet. - m_config.schema_mode = SchemaMode::Additive; - m_config.schema = Schema{ - {"RealmFile", { - Property{"path", PropertyType::String, Property::IsPrimary{true}}, - }}, - }; - m_config.schema_version = 0; - m_config.sync_config = std::make_shared<SyncConfig>(m_sync_config_template); - // m_config.sync_config->reference_realm_url += "/__admin"; -} - -void AdminRealmListener::start() -{ - if (m_download_session) { - // If we're already downloading the Realm, don't need to do anything - return; - } - - if (auto realm = m_results.get_realm()) { - // If we've finished downloading the Realm, just re-report all the files listed in it - auto& group = realm->read_group(); - auto& table = *ObjectStore::table_for_object_type(group, "RealmFile"); - auto path_col_key = table.get_column_key("path"); - - for (auto& obj : table) { - register_realm(table.get_object_id(obj.get_key()), obj.get<String>(path_col_key)); - } - return; - } - - std::weak_ptr<AdminRealmListener> weak_self = shared_from_this(); - - m_config.sync_config->error_handler = util::EventLoopDispatcher<void(std::shared_ptr<SyncSession>, SyncError)>([weak_self, this](std::shared_ptr<SyncSession>, SyncError e) { - auto self = weak_self.lock(); - if (!self) - return; - error(std::make_exception_ptr(std::system_error(e.error_code))); - m_download_session.reset(); - }); - - util::EventLoopDispatcher<void(std::error_code)> download_callback([weak_self, this](std::error_code ec) { - auto self = weak_self.lock(); - if (!self) - return; - - auto cleanup = util::make_scope_exit([&]() noexcept { m_download_session.reset(); }); - if (ec) { - if (ec == util::error::operation_aborted) - return; - error(std::make_exception_ptr(std::system_error(ec))); - return; - } - download_complete(); - - auto realm = Realm::get_shared_realm(m_config); - m_results = Results(realm, ObjectStore::table_for_object_type(realm->read_group(), "RealmFile")).sort({{"path", true}}); - - struct Handler { - bool initial_sent = false; - std::weak_ptr<AdminRealmListener> weak_self; - Handler(std::weak_ptr<AdminRealmListener> weak_self) : weak_self(std::move(weak_self)) { } - - void before(CollectionChangeSet const& c) - { - if (c.deletions.empty()) - return; - auto self = weak_self.lock(); - if (!self) - return; - - auto table = self->m_results.get_tableview().get_parent(); - auto path_col_key = table->get_column_key("path"); - for (auto i : c.deletions.as_indexes()) { - auto obj = self->m_results.get(i); - self->unregister_realm(table->get_object_id(obj.get_key()), obj.get<StringData>(path_col_key)); - } - } - - void after(CollectionChangeSet const& c) - { - if (c.insertions.empty() && initial_sent) - return; - - auto self = weak_self.lock(); - if (!self) - return; - if (self->m_results.size() == 0) - return; - - auto table = self->m_results.get_tableview().get_parent(); - auto path_col_key = table->get_column_key("path"); - - if (!initial_sent) { - for (size_t i = 0, size = self->m_results.size(); i < size; ++i) { - auto obj = self->m_results.get(i); - self->register_realm(table->get_object_id(obj.get_key()), obj.get<StringData>(path_col_key)); - } - initial_sent = true; - } - else { - for (auto i : c.insertions.as_indexes()) { - auto obj = self->m_results.get(i); - self->register_realm(table->get_object_id(obj.get_key()), obj.get<StringData>(path_col_key)); - } - } - } - - void error(std::exception_ptr e) - { - if (auto self = weak_self.lock()) - self->error(e); - } - }; - m_notification_token = m_results.add_notification_callback(Handler(std::move(weak_self))); - }); - m_download_session = SyncManager::shared().get_session(m_config.path, *m_config.sync_config); - m_download_session->wait_for_download_completion(std::move(download_callback)); -} - -Realm::Config AdminRealmListener::get_config(StringData virtual_path, StringData id) const { - Realm::Config config; - - std::string file_path = m_local_root_dir + "/realms" + virtual_path.data(); - if (id) { - file_path += std::string("/") + id.data(); - } - file_path += + ".realm"; - for (size_t pos = m_local_root_dir.size(); pos != file_path.npos; pos = file_path.find('/', pos + 1)) { - file_path[pos] = '\0'; - util::try_make_dir(file_path); - file_path[pos] = '/'; - } - - config.path = std::move(file_path); - config.sync_config = std::make_unique<SyncConfig>(m_sync_config_template); - // config.sync_config->reference_realm_url += virtual_path.data(); - config.schema_mode = SchemaMode::Additive; - config.automatic_change_notifications = false; - return config; -} diff --git a/src/server/admin_realm.hpp b/src/server/admin_realm.hpp deleted file mode 100644 index 237ede4d9..000000000 --- a/src/server/admin_realm.hpp +++ /dev/null @@ -1,59 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#ifndef REALM_JS_ADMIN_REALM_HPP -#define REALM_JS_ADMIN_REALM_HPP - -#include "results.hpp" -#include "shared_realm.hpp" -#include "sync/sync_config.hpp" - -#include <memory> - -namespace realm { -class SyncUser; -class SyncSession; -namespace sync { -struct GlobalKey; -} - -class AdminRealmListener : public std::enable_shared_from_this<AdminRealmListener> { -public: - AdminRealmListener(std::string local_root_dir, SyncConfig sync_config_template); - - void start(); - - Realm::Config get_config(StringData virtual_path, StringData id = nullptr) const; - - virtual void register_realm(GlobalKey id, StringData virtual_path) = 0; - virtual void unregister_realm(GlobalKey id, StringData virtual_path) = 0; - virtual void download_complete() = 0; - virtual void error(std::exception_ptr) = 0; - -private: - Realm::Config m_config; - const std::string m_local_root_dir; - const SyncConfig m_sync_config_template; - Results m_results; - NotificationToken m_notification_token; - std::shared_ptr<SyncSession> m_download_session; -}; - -} // namespace realm - -#endif // REALM_JS_ADMIN_REALM_HPP diff --git a/src/server/global_notifier.cpp b/src/server/global_notifier.cpp deleted file mode 100644 index 3f1d5d431..000000000 --- a/src/server/global_notifier.cpp +++ /dev/null @@ -1,420 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#include "global_notifier.hpp" - -#include "admin_realm.hpp" - -#include "impl/realm_coordinator.hpp" -#include "impl/transact_log_handler.hpp" -#include "object_schema.hpp" -#include "object_store.hpp" -#include "results.hpp" -#include "util/event_loop_dispatcher.hpp" -#include "util/event_loop_signal.hpp" - -#include "sync/sync_manager.hpp" -#include "sync/sync_user.hpp" -#include "sync/sync_session.hpp" - -#include <realm/util/file.hpp> -#include <realm/util/scope_exit.hpp> -#include <realm/global_key.hpp> - -#include <json.hpp> - -#include <condition_variable> -#include <mutex> -#include <queue> -#include <stdexcept> -#include <unordered_map> -#include <utility> -#include <vector> - -using namespace realm; -using namespace realm::_impl; - -namespace realm { -static void to_json(nlohmann::json& j, VersionID v) -{ - j = {{"version", v.version}, {"index", v.index}}; -} -static void from_json(nlohmann::json const& j, VersionID& v) -{ - v.version = j["version"]; - v.index = j["index"]; -} - -static void to_json(nlohmann::json& j, GlobalKey id) -{ - j = id.to_string(); -} -static void from_json(nlohmann::json const& j, GlobalKey& v) -{ - std::string str = j; - v = GlobalKey::from_string(str); -} -} - -class GlobalNotifier::Impl final : public AdminRealmListener { -public: - Impl(std::unique_ptr<Callback>, - std::string local_root_dir, SyncConfig sync_config_template); - - util::Optional<ChangeNotification> next_changed_realm(); - void release_version(GlobalKey id, VersionID old_version, VersionID new_version); - -public: - void register_realm(GlobalKey, StringData virtual_path) override; - void unregister_realm(GlobalKey, StringData) override; - void error(std::exception_ptr err) override { m_target->error(err); } - void download_complete() override { m_target->download_complete(); } - - const std::unique_ptr<util::Logger> m_logger; - const std::unique_ptr<Callback> m_target; - - std::mutex m_work_queue_mutex; - struct RealmToCalculate { - GlobalKey realm_id; - std::string virtual_path; - std::shared_ptr<_impl::RealmCoordinator> coordinator; - TransactionRef transaction; - std::queue<VersionID> versions; - bool pending_deletion = false; - - // constructor to make GCC 4.9 happy - RealmToCalculate(GlobalKey realm_id, std::string virtual_path) - : realm_id(realm_id) - , virtual_path(std::move(virtual_path)) - { - } - }; - std::queue<RealmToCalculate*> m_work_queue; - std::unordered_map<GlobalKey, RealmToCalculate> m_realms; - - struct SignalCallback { - std::weak_ptr<Impl> notifier; - void operator()() - { - if (auto alive = notifier.lock()) { - GlobalNotifier notifier(alive); - alive->m_target->realm_changed(¬ifier); - } - } - }; - - std::shared_ptr<util::EventLoopSignal<SignalCallback>> m_signal; -}; - -GlobalNotifier::Impl::Impl(std::unique_ptr<Callback> async_target, - std::string local_root_dir, SyncConfig sync_config_template) -: AdminRealmListener(local_root_dir, std::move(sync_config_template)) -, m_logger(SyncManager::shared().make_logger()) -, m_target(std::move(async_target)) -{ -} - -void GlobalNotifier::Impl::register_realm(GlobalKey id, StringData path) { - auto info = &m_realms.emplace(id, RealmToCalculate{id, path}).first->second; - m_realms.emplace(id, RealmToCalculate{id, path}); - - auto strid = id.to_string(); - if (!m_target->realm_available(strid, path)) { - m_logger->trace("Global notifier: not watching %1", path); - return; - } - if (info->coordinator) { - m_logger->trace("Global notifier: already watching %1", path); - return; - } - - m_logger->trace("Global notifier: watching %1", path); - auto config = get_config(path, strid); - info->coordinator = _impl::RealmCoordinator::get_coordinator(config); - - std::weak_ptr<Impl> weak_self = std::static_pointer_cast<Impl>(shared_from_this()); - info->coordinator->set_transaction_callback([=, weak_self = std::move(weak_self)](VersionID old_version, VersionID new_version) { - auto self = weak_self.lock(); - if (!self) - return; - - std::lock_guard<std::mutex> l(m_work_queue_mutex); - if (info->transaction) { - m_logger->trace("Global notifier: sync transaction on (%1): Realm already open", info->virtual_path); - } - else { - m_logger->trace("Global notifier: sync transaction on (%1): opening Realm", info->virtual_path); - std::unique_ptr<Group> read_only_group; - auto config = info->coordinator->get_config(); - config.force_sync_history = true; // FIXME: needed? - config.schema = util::none; - info->coordinator->open_with_config(config); - auto group_ptr = info->coordinator->begin_read(old_version); - REALM_ASSERT(std::dynamic_pointer_cast<Transaction>(group_ptr)); - info->transaction = std::static_pointer_cast<Transaction>(group_ptr); - } - info->versions.push(new_version); - if (info->versions.size() == 1) { - m_logger->trace("Global notifier: Signaling main thread"); - m_work_queue.push(info); - m_signal->notify(); - } - }); -} - -void GlobalNotifier::Impl::unregister_realm(GlobalKey id, StringData path) { - auto realm = m_realms.find(id); - if (realm == m_realms.end()) { - m_logger->trace("Global notifier: unwatched Realm at (%1) was deleted", path); - return; - } - - std::lock_guard<std::mutex> l(m_work_queue_mutex); - // Otherwise we need to defer closing the Realm until we're done with our current work. - m_logger->trace("Global notifier: enqueuing deletion of Realm at (%1)", path); - if (realm->second.coordinator) - realm->second.coordinator->set_transaction_callback(nullptr); - realm->second.pending_deletion = true; - - if (realm->second.versions.empty()) { - m_work_queue.push(&realm->second); - m_signal->notify(); - } -} - -void GlobalNotifier::Impl::release_version(GlobalKey id, VersionID old_version, VersionID new_version) -{ - std::lock_guard<std::mutex> l(m_work_queue_mutex); - - auto it = m_realms.find(id); - REALM_ASSERT(it != m_realms.end()); - auto& info = it->second; - - if (info.pending_deletion && old_version == VersionID()) { - m_logger->trace("Global notifier: completing pending deletion of (%1)", info.virtual_path); - if (info.coordinator) { - std::string path = info.coordinator->get_config().path; - m_realms.erase(it); - File::remove(path); - } - else { - m_realms.erase(it); - } - } - else { - Transaction& tr = *info.transaction.get(); - REALM_ASSERT(tr.get_version_of_current_transaction() == old_version); - - REALM_ASSERT(!info.versions.empty() && info.versions.front() == new_version); - info.versions.pop(); - - if (info.versions.empty()) { - info.transaction = nullptr; - m_logger->trace("Global notifier: release version on (%1): no pending versions", info.virtual_path); - - if (info.pending_deletion) { - m_logger->trace("Global notifier: enqueuing deletion notification for (%1)", info.virtual_path); - m_work_queue.push(&info); - } - } - else { - tr.advance_read(new_version); - m_work_queue.push(&info); - m_logger->trace("Global notifier: release version on (%1): enqueuing next version", info.virtual_path); - } - } - - if (!m_work_queue.empty()) { - m_logger->trace("Global notifier: Signaling main thread"); - m_signal->notify(); - } -} - -GlobalNotifier::GlobalNotifier(std::unique_ptr<Callback> async_target, - std::string local_root_dir, SyncConfig sync_config_template) -: m_impl(std::make_shared<GlobalNotifier::Impl>(std::move(async_target), - std::move(local_root_dir), - std::move(sync_config_template))) -{ - std::weak_ptr<GlobalNotifier::Impl> weak_impl = m_impl; - m_impl->m_signal = std::make_shared<util::EventLoopSignal<Impl::SignalCallback>>(Impl::SignalCallback{weak_impl}); -} - -void GlobalNotifier::start() -{ - m_impl->m_logger->trace("Global notifier: start()"); - m_impl->start(); -} - -GlobalNotifier::~GlobalNotifier() = default; - -util::Optional<GlobalNotifier::ChangeNotification> GlobalNotifier::next_changed_realm() -{ - std::lock_guard<std::mutex> l(m_impl->m_work_queue_mutex); - if (m_impl->m_work_queue.empty()) { - m_impl->m_logger->trace("Global notifier: next_changed_realm(): no realms pending"); - return util::none; - } - - auto next = m_impl->m_work_queue.front(); - m_impl->m_work_queue.pop(); - m_impl->m_logger->trace("Global notifier: notifying for realm at %1", next->virtual_path); - - if (next->versions.empty() && next->pending_deletion) { - return ChangeNotification(m_impl, next->virtual_path, next->realm_id); - } - - auto old_version = next->transaction->get_version_of_current_transaction(); - return ChangeNotification(m_impl, next->virtual_path, next->realm_id, - next->coordinator->get_config(), - old_version, next->versions.front()); -} - -GlobalNotifier::Callback& GlobalNotifier::target() -{ - return *m_impl->m_target; -} - -GlobalNotifier::ChangeNotification::ChangeNotification(std::shared_ptr<GlobalNotifier::Impl> notifier, - std::string virtual_path, - GlobalKey realm_id, - Realm::Config config, - VersionID old_version, - VersionID new_version) -: realm_path(std::move(virtual_path)) -, type(Type::Change) -, m_realm_id(realm_id) -, m_config(std::move(config)) -, m_old_version(old_version) -, m_new_version(new_version) -, m_notifier(std::move(notifier)) -{ -} - -GlobalNotifier::ChangeNotification::ChangeNotification(std::shared_ptr<GlobalNotifier::Impl> notifier, - std::string virtual_path, - GlobalKey realm_id) -: realm_path(std::move(virtual_path)) -, type(Type::Delete) -, m_realm_id(realm_id) -, m_notifier(std::move(notifier)) -{ -} - -GlobalNotifier::ChangeNotification::~ChangeNotification() -{ - if (m_notifier) - m_notifier->release_version(m_realm_id, m_old_version, m_new_version); - if (m_old_realm) - m_old_realm->close(); - if (m_new_realm) - m_new_realm->close(); -} - -std::string GlobalNotifier::ChangeNotification::serialize() const -{ - nlohmann::json ret; - ret["virtual_path"] = realm_path; - ret["realm_id"] = m_realm_id; - if (type == Type::Change) { - ret["path"] = m_config.path; - ret["old_version"] = m_old_version; - ret["new_version"] = m_new_version; - ret["is_change"] = true; - } - return ret.dump(); -} - -GlobalNotifier::ChangeNotification::ChangeNotification(std::string const& serialized) -{ - auto parsed = nlohmann::json::parse(serialized); - realm_path = parsed["virtual_path"]; - m_realm_id = parsed["realm_id"]; - - if (!parsed["is_change"].is_null()) { - type = Type::Change; - m_old_version = parsed["old_version"]; - m_new_version = parsed["new_version"]; - - m_config.path = parsed["path"]; - m_config.force_sync_history = true; - m_config.schema_mode = SchemaMode::Additive; - m_config.automatic_change_notifications = false; - } - else { - type = Type::Delete; - } -} - -SharedRealm GlobalNotifier::ChangeNotification::get_old_realm() const -{ - if (const_cast<VersionID&>(m_old_version) == VersionID{}) - return nullptr; - if (m_old_realm) - return m_old_realm; - - m_old_realm = Realm::get_shared_realm(m_config); - Realm::Internal::begin_read(*m_old_realm, m_old_version); - return m_old_realm; -} - -SharedRealm GlobalNotifier::ChangeNotification::get_new_realm() const -{ - if (m_new_realm) - return m_new_realm; - m_new_realm = Realm::get_shared_realm(m_config); - Realm::Internal::begin_read(*m_new_realm, m_new_version); - return m_new_realm; -} - -std::unordered_map<std::string, ObjectChangeSet> const& GlobalNotifier::ChangeNotification::get_changes() const -{ - if (m_have_calculated_changes) - return m_changes; - - Realm::Config config; - config.path = m_config.path; - config.force_sync_history = true; - config.automatic_change_notifications = false; - - auto realm = Realm::get_shared_realm(config); - - Realm::Internal::begin_read(*realm, m_old_version); - Group const& g = realm->read_group(); - - _impl::TransactionChangeInfo info; - info.track_all = true; - _impl::transaction::advance(realm->transaction(), info, m_new_version); - - m_changes.reserve(info.tables.size()); - auto table_keys = g.get_table_keys(); - for (auto table_key : table_keys) { - auto& change = info.tables[table_key.value]; - if (!change.empty()) { - auto name = ObjectStore::object_type_for_table_name(g.get_table_name(table_key)); - if (name) { - m_changes[name] = std::move(change); - } - } - } - - m_have_calculated_changes = true; - return m_changes; -} - -GlobalNotifier::Callback::~Callback() = default; diff --git a/src/server/global_notifier.hpp b/src/server/global_notifier.hpp deleted file mode 100644 index 5699ea06f..000000000 --- a/src/server/global_notifier.hpp +++ /dev/null @@ -1,150 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#ifndef REALM_OBJECT_STORE_GLOBAL_NOTIFIER_HPP -#define REALM_OBJECT_STORE_GLOBAL_NOTIFIER_HPP - -#include "object_changeset.hpp" -#include "impl/collection_notifier.hpp" -#include "shared_realm.hpp" -#include "sync/sync_config.hpp" - -namespace realm { -struct GlobalKey; -class SyncUser; - -/// Used to listen for changes across all, or a subset of all Realms on a -/// particular sync server. -class GlobalNotifier { -public: - class Callback; - GlobalNotifier(std::unique_ptr<Callback>, std::string local_root_dir, - SyncConfig sync_config_template); - ~GlobalNotifier(); - - // Returns the target callback - Callback& target(); - - void start(); - - class ChangeNotification; - util::Optional<ChangeNotification> next_changed_realm(); - -private: - class Impl; - std::shared_ptr<Impl> m_impl; - - GlobalNotifier(std::shared_ptr<Impl> impl) : m_impl(std::move(impl)) {} -}; - -class GlobalNotifier::ChangeNotification { -public: - // The virtual server path of the Realm which changed. - std::string realm_path; - - // The kind of change which happened to the Realm. - enum class Type { - Change, - Delete - } type; - - // The Realm which changed, at the version immediately before the changes - // made. `modifications` and `deletions` within the change sets are indices - // in this Realm. - // This will be nullptr for the initial notification of a Realm which - // already existed when the GlobalNotifier was created. - SharedRealm get_old_realm() const; - - // The Realm which changed, at the first version including the changes made. - // `modifications_new` and `insertions` within the change sets are indices - // in this Realm. - SharedRealm get_new_realm() const; - - // The actual changes made, keyed on object name. - // This will be empty if the Realm already existed before the - // GlobalNotifier was started. - std::unordered_map<std::string, ObjectChangeSet> const& get_changes() const; - - ~ChangeNotification(); - - std::string serialize() const; - ChangeNotification(std::string const&); - - ChangeNotification(ChangeNotification&&) = default; - ChangeNotification& operator=(ChangeNotification&&) = default; - ChangeNotification(ChangeNotification const&) = delete; - ChangeNotification& operator=(ChangeNotification const&) = delete; - -private: - ChangeNotification(std::shared_ptr<GlobalNotifier::Impl> notifier, - std::string virtual_path, - GlobalKey realm_id, - Realm::Config config, - VersionID old_version, VersionID new_version); - ChangeNotification(std::shared_ptr<GlobalNotifier::Impl> notifier, - std::string virtual_path, - GlobalKey realm_id); - GlobalKey m_realm_id; - Realm::Config m_config; - VersionID m_old_version; - VersionID m_new_version; - std::shared_ptr<GlobalNotifier::Impl> m_notifier; - mutable std::shared_ptr<Realm> m_old_realm; - mutable std::shared_ptr<Realm> m_new_realm; - mutable std::unordered_map<std::string, ObjectChangeSet> m_changes; - mutable bool m_have_calculated_changes = false; - - ChangeNotification() = default; - - friend class GlobalNotifier; -}; - -class GlobalNotifier::Callback { -public: - virtual ~Callback(); - - /// Called when the initial download of the admin realm is complete and observation is beginning - virtual void download_complete() = 0; - - /// Called when any error occurs within the global notifier - virtual void error(std::exception_ptr) = 0; - - /// Called to determine whether the application wants to listen for changes - /// to a particular Realm. - /// - /// The Realm name that is passed to the callback is hierarchical and takes - /// the form of an absolute path (separated by forward slashes). This is a - /// *virtual path*, i.e, it is not necesarily the file system path of the - /// Realm on the server. - /// - /// If this function returns false, the global notifier will not observe - /// the Realm. - /// - /// \param id A unique identifier for the Realm which will not be reused - /// even if multiple Realms are created for a single virtual path. - /// \param name The name (virtual path) by which the server knows that - /// Realm. - virtual bool realm_available(StringData id, StringData virtual_path) = 0; - - /// Called when a new version is available in an observed Realm. - virtual void realm_changed(GlobalNotifier*) = 0; -}; - -} // namespace realm - -#endif // REALM_OBJECT_STORE_GLOBAL_NOTIFIER_HPP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 311a6f01d..2cf511c01 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -49,12 +49,6 @@ if(REALM_ENABLE_SYNC) ) endif() -if(REALM_ENABLE_SERVER) - list(APPEND SOURCES - sync/global_notifier.cpp - ) -endif() - add_executable(tests ${SOURCES} ${HEADERS}) target_compile_definitions(tests PRIVATE ${PLATFORM_DEFINES}) diff --git a/tests/sync/global_notifier.cpp b/tests/sync/global_notifier.cpp deleted file mode 100644 index a97938e9c..000000000 --- a/tests/sync/global_notifier.cpp +++ /dev/null @@ -1,272 +0,0 @@ -//////////////////////////////////////////////////////////////////////////// -// -// Copyright 2016 Realm Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -//////////////////////////////////////////////////////////////////////////// - -#include "util/event_loop.hpp" -#include "util/test_file.hpp" -#include "util/test_utils.hpp" - -#include <realm/util/file.hpp> -#include <realm/util/scope_exit.hpp> - -#include <binding_context.hpp> -#include <impl/object_accessor_impl.hpp> -#include <impl/realm_coordinator.hpp> -#include <object_schema.hpp> -#include <property.hpp> -#include <results.hpp> -#include <schema.hpp> -#include <server/admin_realm.hpp> -#include <server/global_notifier.hpp> - -using namespace realm; -using namespace realm::util; - -namespace { - using AnyDict = std::map<std::string, util::Any>; - using AnyVec = std::vector<util::Any>; -} - -struct TestContext : CppContext { - std::map<std::string, AnyDict> defaults; - - using CppContext::CppContext; - TestContext(TestContext& parent, realm::Property const& prop) - : CppContext(parent, prop) - , defaults(parent.defaults) - { } - - void will_change(Object const&, Property const&) {} - void did_change() {} - std::string print(util::Any) { return "not implemented"; } - bool allow_missing(util::Any) { return false; } -}; - -struct TestNotifierCallback : public GlobalNotifier::Callback -{ - TestNotifierCallback(std::function<void()> download_completion_handler, - std::function<void(std::exception_ptr)> error_handler, - std::function<bool(StringData, StringData)> realm_available_handler, - std::function<void(GlobalNotifier*)> realm_changed_handler) - : m_download_completion_handler(download_completion_handler) - , m_error_handler(error_handler) - , m_realm_available_handler(realm_available_handler) - , m_realm_changed_handler(realm_changed_handler) - { - } - - /// Called when the initial download of the admin realm is complete and observation is beginning - void download_complete() - { - m_download_completion_handler(); - } - - /// Called when any error occurs within the global notifier - void error(std::exception_ptr e) - { - m_error_handler(e); - } - - /// Called to determine whether the application wants to listen for changes - /// to a particular Realm. - /// - /// The Realm name that is passed to the callback is hierarchical and takes - /// the form of an absolute path (separated by forward slashes). This is a - /// *virtual path*, i.e, it is not necesarily the file system path of the - /// Realm on the server. - /// - /// If this function returns false, the global notifier will not observe - /// the Realm. - /// - /// \param id A unique identifier for the Realm which will not be reused - /// even if multiple Realms are created for a single virtual path. - /// \param name The name (virtual path) by which the server knows that - /// Realm. - bool realm_available(StringData id, StringData virtual_path) - { - return m_realm_available_handler(id, virtual_path); - } - - /// Called when a new version is available in an observed Realm. - void realm_changed(GlobalNotifier* notifier) - { - m_realm_changed_handler(notifier); - } - -private: - std::function<void()> m_download_completion_handler; - std::function<void(std::exception_ptr)> m_error_handler; - std::function<bool(StringData, StringData)> m_realm_available_handler; - std::function<void(GlobalNotifier*)> m_realm_changed_handler; -}; - -TEST_CASE("global_notifier: basics", "[sync][global_notifier]") { - _impl::RealmCoordinator::assert_no_open_realms(); - - SECTION("listened to realms trigger notification") { - SyncServer server(false); - std::string realm_name = "listened_to"; - std::string table_name = "class_object"; - std::string value_col_name = "value"; - std::string object_name = "object"; - std::string object_table_name = "class_object"; - - // Add an object to the Realm so that notifications are needed - auto make_object = [&server, &object_name, &object_table_name, &value_col_name](std::string realm_name, int64_t value) { - SyncTestFile config(server, realm_name); - config.schema = Schema { - {object_name, { - {value_col_name, PropertyType::Int, Property::IsPrimary{true}}, - }}, - }; - auto write_realm = Realm::get_shared_realm(config); - wait_for_download(*write_realm); - write_realm->begin_transaction(); - write_realm->read_group().get_table(object_table_name)->create_object_with_primary_key(value); - write_realm->commit_transaction(); - wait_for_upload(*write_realm); - }; - - auto notify_gn_of_realm_change = [&](std::string path) { - SyncTestFile admin_config(server, "__admin"); - // See AdminRealmListener, in practice this will be updated by ROS - admin_config.schema = Schema{ - {"RealmFile", { - Property{"path", PropertyType::String, Property::IsPrimary{true}}, - Property{"counter", PropertyType::Int}, - }}, - }; - auto admin_realm = Realm::get_shared_realm(admin_config); - wait_for_download(*admin_realm); - admin_realm->begin_transaction(); - auto table = admin_realm->read_group().get_table("class_RealmFile"); - auto path_col = table->get_column_key("path"); - auto count_col = table->get_column_key("counter"); - auto existing_obj_key = table->find_first_string(path_col, path); - if (existing_obj_key) { - auto existing_obj = table->get_object(existing_obj_key); - existing_obj.set(count_col, existing_obj.get<int64_t>(count_col) + 1); - } - else { - auto obj = table->create_object_with_primary_key(path); - obj.set(count_col, 0); - } - admin_realm->commit_transaction(); - wait_for_upload(*admin_realm); - }; - - SECTION("notifications across two transactions are merged before reported") { - std::atomic<size_t> triggered_download(0); - std::atomic<size_t> triggered_realm_notification(0); - std::atomic<size_t> triggered_realm_change(0); - - std::unique_ptr<TestNotifierCallback> callback = std::make_unique<TestNotifierCallback>( - [&triggered_download]() { - triggered_download++; - }, - [](std::exception_ptr) { - }, - [&triggered_realm_notification](StringData /*id*/, StringData /*virtual_path*/) { - triggered_realm_notification++; - return true; - }, - [&](GlobalNotifier* gn) { - REQUIRE(gn != nullptr); - triggered_realm_change++; - }); - - SyncTestFile gn_config_template(server, ""); - // gn_config_template.sync_config->reference_realm_url = server.base_url(); - GlobalNotifier global_notifier(std::move(callback), server.local_root_dir(), *gn_config_template.sync_config); - REQUIRE(triggered_download.load() == 0); - global_notifier.start(); - server.start(); - - { - auto next_change = global_notifier.next_changed_realm(); - REQUIRE(!next_change); - REQUIRE(triggered_realm_notification.load() == 0); - REQUIRE(triggered_realm_change.load() == 0); - } - - // add two objects, in different transactions - constexpr int64_t initial_value = 100; - make_object(realm_name, initial_value); - constexpr int64_t second_value = 200; - make_object(realm_name, second_value); - - EventLoop::main().run_until([&] { return triggered_download.load() > 0; }); - - notify_gn_of_realm_change(std::string("/") + realm_name); - EventLoop::main().run_until([&] { return triggered_realm_notification.load() > 0; }); - EventLoop::main().run_until([&] { return triggered_realm_change.load() > 0; }); - - { - auto next_change = global_notifier.next_changed_realm(); - REQUIRE(bool(next_change)); - REQUIRE(next_change->realm_path == (std::string("/") + realm_name)); - REQUIRE(next_change->type == GlobalNotifier::ChangeNotification::Type::Change); - - auto changes = next_change->get_changes(); - REQUIRE(!changes.empty()); - REQUIRE(changes.size() == 1); - REQUIRE(changes.find(object_name) != changes.end()); - REQUIRE(changes[object_name].insertions_size() == 2); - REQUIRE(changes[object_name].modifications_size() == 0); - REQUIRE(changes[object_name].deletions_size() == 0); - - { - auto old_realm = next_change->get_old_realm(); - REQUIRE(!old_realm->read_group().has_table(table_name)); - } - { - auto new_realm = next_change->get_new_realm(); - auto object_table = new_realm->read_group().get_table(table_name); - REQUIRE(object_table); - REQUIRE(object_table->size() == 2); - auto value_col_key = object_table->get_column_key(value_col_name); - - REQUIRE(bool(object_table->find_first_int(value_col_key, initial_value))); - REQUIRE(bool(object_table->find_first_int(value_col_key, second_value))); - - REQUIRE(changes[object_name].get_insertions().size() == 2); - for (auto insertion : changes[object_name].get_insertions()) { - ObjKey key(insertion); - REQUIRE(bool(key)); - REQUIRE(object_table->get_object(key)); - int64_t value = object_table->get_object(key).get<int64_t>(value_col_name); - REQUIRE((value == initial_value || value == second_value)); - } - // no modifications on inserted objects, but the below loop at least checks for compile errors - REQUIRE(changes[object_name].get_modifications().size() == 0); - for (auto modification : changes[object_name].get_modifications()) { - ObjKey key(modification.first); - REQUIRE(bool(key)); - REQUIRE(object_table->get_object(key)); - int64_t value = object_table->get_object(key).get<int64_t>(value_col_name); - REQUIRE((value == initial_value || value == second_value)); - } - auto deletions = changes[object_name].get_deletions(); - REQUIRE(deletions.size() == 0); - REQUIRE(deletions.begin() == deletions.end()); - } - next_change = global_notifier.next_changed_realm(); - REQUIRE(!bool(next_change)); - } - } - } -} diff --git a/workflow/build.sh b/workflow/build.sh index 928f24304..b6b8709d4 100755 --- a/workflow/build.sh +++ b/workflow/build.sh @@ -30,7 +30,7 @@ if [ "${flavor}" = "android" ]; then fi if [ "${sync}" = "sync" ]; then - cmake_flags="${cmake_flags} -DREALM_ENABLE_SYNC=1 -DREALM_ENABLE_SERVER=1 -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR}" + cmake_flags="${cmake_flags} -DREALM_ENABLE_SYNC=1 -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR}" fi cmake ${cmake_flags} ${extra_flags} .. diff --git a/workflow/test_coverage.sh b/workflow/test_coverage.sh index 7a42e543a..05cdfefbe 100755 --- a/workflow/test_coverage.sh +++ b/workflow/test_coverage.sh @@ -27,7 +27,7 @@ cd coverage.build cmake_flags="" if [ "${sync}" = "sync" ]; then - cmake_flags="${cmake_flags} -DREALM_ENABLE_SYNC=1 -DREALM_ENABLE_SERVER=1 -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR}" + cmake_flags="${cmake_flags} -DREALM_ENABLE_SYNC=1 -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_DIR}" fi cmake ${cmake_flags} -DCMAKE_BUILD_TYPE=Coverage -DDEPENDENCIES_FILE="dependencies${deps_suffix}.list" ..