Skip to content

Commit

Permalink
Guarantee that bootstraps will be applied if the full bootstrap is
Browse files Browse the repository at this point in the history
received.
  • Loading branch information
jbreams committed May 11, 2022
1 parent ce55b77 commit 95c4fad
Show file tree
Hide file tree
Showing 16 changed files with 895 additions and 75 deletions.
16 changes: 13 additions & 3 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//
////////////////////////////////////////////////////////////////////////////

#include "realm/sync/client_base.hpp"
#include <realm/object-store/sync/sync_session.hpp>

#include <realm/object-store/impl/realm_coordinator.hpp>
Expand Down Expand Up @@ -656,9 +657,18 @@ void SyncSession::create_sync_session()
session_config.ssl_verify_callback = m_config.ssl_verify_callback;
session_config.proxy_config = m_config.proxy_config;
if (m_config.on_download_message_received_hook) {
session_config.on_download_message_received_hook = [hook = m_config.on_download_message_received_hook,
anchor = weak_from_this()] {
hook(anchor);
session_config.on_download_message_received_hook =
[hook = m_config.on_download_message_received_hook, anchor = weak_from_this()](
const sync::SyncProgress& progress, int64_t query_version, sync::DownloadBatchState batch_state) {
hook(anchor, progress, query_version, batch_state);
};
}
if (m_config.on_bootstrap_message_processed_hook) {
session_config.on_bootstrap_message_processed_hook =
[hook = m_config.on_bootstrap_message_processed_hook,
anchor = weak_from_this()](const sync::SyncProgress& progress, int64_t query_version,
sync::DownloadBatchState batch_state) -> bool {
return hook(anchor, progress, query_version, batch_state);
};
}

Expand Down
1 change: 1 addition & 0 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef REALM_OS_SYNC_SESSION_HPP
#define REALM_OS_SYNC_SESSION_HPP

#include "realm/sync/client_base.hpp"
#include <realm/object-store/feature_checks.hpp>
#include <realm/object-store/sync/generic_network_transport.hpp>
#include <realm/sync/config.hpp>
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(SYNC_SOURCES
noinst/client_reset_operation.cpp
noinst/client_reset_recovery.cpp
noinst/compact_changesets.cpp
noinst/pending_bootstrap_store.cpp
noinst/protocol_codec.cpp
noinst/sync_metadata_schema.cpp
changeset_encoder.cpp
Expand Down Expand Up @@ -60,6 +61,7 @@ set(NOINST_HEADERS
noinst/client_reset_recovery.hpp
noinst/compact_changesets.hpp
noinst/integer_codec.hpp
noinst/pending_bootstrap_store.hpp
noinst/protocol_codec.hpp
noinst/root_certs.hpp
noinst/sync_metadata_schema.hpp
Expand Down
136 changes: 127 additions & 9 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@

#include <memory>
#include <tuple>
#include <atomic>

#include "realm/sync/client_base.hpp"
#include "realm/sync/protocol.hpp"
#include "realm/util/optional.hpp"
#include <realm/sync/client.hpp>
#include <realm/sync/config.hpp>
#include <realm/sync/noinst/client_history_impl.hpp>
#include <realm/sync/noinst/client_impl_base.hpp>
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
#include <realm/sync/subscriptions.hpp>
#include <realm/util/bind_ptr.hpp>
#include <realm/util/circular_buffer.hpp>
Expand Down Expand Up @@ -175,6 +181,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac

bool has_flx_subscription_store() const;
SubscriptionStore* get_flx_subscription_store();
PendingBootstrapStore* get_flx_pending_bootstrap_store();

void set_sync_transact_handler(util::UniqueFunction<SyncTransactCallback>);
void set_progress_handler(util::UniqueFunction<ProgressHandler>);
Expand All @@ -201,7 +208,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
void finalize_before_actualization() noexcept;

// Overriding member function in SyncTransactReporter
void report_sync_transact(VersionID, VersionID) override final;
void report_sync_transact(VersionID, VersionID)
override;

void on_new_flx_subscription_set(int64_t new_version);

Expand Down Expand Up @@ -237,12 +245,14 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
util::UniqueFunction<ProgressHandler> m_progress_handler;
util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;

std::function<void()> m_on_download_message_received_hook;
std::function<void(const SyncProgress&, int64_t, DownloadBatchState)> m_on_download_message_received_hook;
std::function<bool(const SyncProgress&, int64_t, DownloadBatchState)> m_on_bootstrap_message_processed_hook;

std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
int64_t m_flx_active_version = 0;
int64_t m_flx_last_seen_version = 0;
int64_t m_flx_latest_version = 0;
std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;

bool m_initiated = false;

Expand Down Expand Up @@ -625,7 +635,6 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
}



// ################ SessionImpl ################


Expand Down Expand Up @@ -719,6 +728,101 @@ void SessionImpl::on_resumed()
}


bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
int64_t query_version, const ReceivedChangesets& received_changesets)
{
if (!m_is_flx_sync_session) {
return false;
}

// If this is a steady state DOWNLOAD, no need for special handling.
if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
return false;
}

auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
util::Optional<SyncProgress> maybe_progress;
if (batch_state == DownloadBatchState::LastInBatch) {
maybe_progress = progress;
}
bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets);

if (batch_state == DownloadBatchState::MoreToCome) {
on_flx_sync_progress(query_version, batch_state);
if (m_wrapper.m_on_bootstrap_message_processed_hook) {
m_wrapper.m_on_bootstrap_message_processed_hook(progress, query_version, batch_state);
}

return true;
}

if (m_wrapper.m_on_bootstrap_message_processed_hook &&
!m_wrapper.m_on_bootstrap_message_processed_hook(progress, query_version, batch_state)) {
return true;
}
process_pending_flx_bootstrap();

return true;
}


void SessionImpl::process_pending_flx_bootstrap()
{
constexpr size_t batch_size_in_bytes = 1024 * 1024;
if (!m_is_flx_sync_session) {
return;
}
auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
if (!bootstrap_store->has_pending()) {
return;
}
auto& history = access_realm().get_history();
VersionInfo new_version;
DownloadCursor download_cursor;
int64_t query_version = -1;
try {
while (bootstrap_store->has_pending()) {
auto pending_batch = bootstrap_store->peek_pending(batch_size_in_bytes);
if (!pending_batch.progress) {
logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
bootstrap_store->clear();
return;
}

auto batch_state =
pending_batch.remaining > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
uint64_t downloadable_bytes = 0;
query_version = pending_batch.query_version;

if (batch_state == DownloadBatchState::LastInBatch) {
update_progress(*pending_batch.progress);
}

history.integrate_server_changesets(
*pending_batch.progress, &downloadable_bytes, pending_batch.changesets.data(),
pending_batch.changesets.size(), new_version, batch_state, logger, [&](const TransactionRef& tr) {
bootstrap_store->pop_front_pending(tr, pending_batch.changesets.size());
});
download_cursor = pending_batch.progress->download;

logger.info(
"Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
"%3. %4 changesets remaining in bootstrap",
pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
pending_batch.remaining);
}
on_changesets_integrated(new_version.realm_version, download_cursor, DownloadBatchState::LastInBatch);
}
catch (const IntegrationException& e) {
on_integration_failure(e, DownloadBatchState::LastInBatch);
}

REALM_ASSERT_3(query_version, !=, -1);
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
m_wrapper.on_sync_progress();
}


void SessionImpl::on_new_flx_subscription_set(int64_t new_version)
{
// If m_state == State::Active then we know that we haven't sent an UNBIND message and all we need to
Expand All @@ -745,12 +849,13 @@ SubscriptionStore* SessionImpl::get_flx_subscription_store()
return m_wrapper.get_flx_subscription_store();
}

void SessionImpl::receive_download_message_hook()
void SessionImpl::receive_download_message_hook(const SyncProgress& progress, int64_t query_version,
DownloadBatchState batch_state)
{
if (REALM_LIKELY(!m_wrapper.m_on_download_message_received_hook)) {
return;
}
m_wrapper.m_on_download_message_received_hook();
m_wrapper.m_on_download_message_received_hook(progress, query_version, batch_state);
}

// ################ SessionWrapper ################
Expand All @@ -775,6 +880,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_client_reset_config{std::move(config.client_reset_config)}
, m_proxy_config{config.proxy_config} // Throws
, m_on_download_message_received_hook(std::move(config.on_download_message_received_hook))
, m_on_bootstrap_message_processed_hook(config.on_bootstrap_message_processed_hook)
, m_flx_subscription_store(std::move(flx_sub_store))
{
REALM_ASSERT(m_db);
Expand Down Expand Up @@ -848,6 +954,10 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
SubscriptionSet::State new_state;
switch (batch_state) {
case DownloadBatchState::LastInBatch:
if (m_flx_active_version == new_version) {
return;
}

m_flx_last_seen_version = new_version;
m_flx_active_version = new_version;
new_state = SubscriptionSet::State::Complete;
Expand All @@ -872,6 +982,11 @@ SubscriptionStore* SessionWrapper::get_flx_subscription_store()
return m_flx_subscription_store.get();
}

PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
{
REALM_ASSERT(m_initiated);
return m_flx_pending_bootstrap_store.get();
}

inline void SessionWrapper::set_sync_transact_handler(util::UniqueFunction<SyncTransactCallback> handler)
{
Expand Down Expand Up @@ -951,7 +1066,6 @@ void SessionWrapper::cancel_reconnect_delay()
m_client.get_service().post(std::move(handler)); // Throws
}


void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
WaitOperCompletionHandler handler)
{
Expand Down Expand Up @@ -1117,11 +1231,15 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
std::unique_ptr<SessionImpl> sess_2 = std::make_unique<SessionImpl>(*this, conn); // Throws
SessionImpl& sess = *sess_2;
sess.logger.detail("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
conn.activate_session(std::move(sess_2)); // Throws
if (sync_mode == SyncServerMode::FLX) {
m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, &sess.logger);
}

m_actualized = true;
sess.logger.detail("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
m_sess = &sess;
conn.activate_session(std::move(sess_2)); // Throws

m_actualized = true;
}
catch (...) {
if (was_created)
Expand Down
6 changes: 5 additions & 1 deletion src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ class Session {
// Will be called after a download message is received and validated by
// the client but befefore it's been transformed or applied. To be used in
// testing only.
std::function<void()> on_download_message_received_hook;
std::function<void(const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
on_download_message_received_hook;
/// Will be called after each download message is integrated. For use in testing.
std::function<bool(const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
on_bootstrap_message_processed_hook;
};

/// \brief Start a new session for the specified client-side Realm.
Expand Down
10 changes: 8 additions & 2 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <realm/util/assert.hpp>
#include <realm/util/optional.hpp>
#include <realm/sync/noinst/client_reset.hpp>
#include "realm/sync/protocol.hpp"

#include <functional>
#include <memory>
Expand Down Expand Up @@ -53,7 +54,7 @@ enum class SimplifiedProtocolError {
namespace sync {
using port_type = std::uint_fast16_t;
enum class ProtocolError;
}
} // namespace sync

SimplifiedProtocolError get_simplified_error(sync::ProtocolError err);

Expand Down Expand Up @@ -167,7 +168,12 @@ struct SyncConfig {

// Will be called after a download message is received and validated by the client but befefore it's been
// transformed or applied. To be used in testing only.
std::function<void(std::weak_ptr<SyncSession>)> on_download_message_received_hook;
std::function<void(std::weak_ptr<SyncSession>, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
on_download_message_received_hook;
// Will be called after each bootstrap message is added to the pending bootstrap store, but before_frozen
// processing a finalized bootstrap. For testing only.
std::function<bool(std::weak_ptr<SyncSession>, const sync::SyncProgress&, int64_t, sync::DownloadBatchState)>
on_bootstrap_message_processed_hook;

explicit SyncConfig(std::shared_ptr<SyncUser> user, bson::Bson partition);
explicit SyncConfig(std::shared_ptr<SyncUser> user, std::string partition);
Expand Down
7 changes: 6 additions & 1 deletion src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//
////////////////////////////////////////////////////////////////////////////

#include "realm/util/functional.hpp"
#include <realm/sync/noinst/client_history_impl.hpp>

#include <realm/util/compression.hpp>
Expand Down Expand Up @@ -322,7 +323,7 @@ void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, ve
const auto sync_history_size = arrays.changesets.size();
const auto sync_history_base_version = rt->get_version() - sync_history_size;

std::size_t accum_byte_size_soft_limit = 0x20000; // 128 KB
std::size_t accum_byte_size_soft_limit = 0x20000; // 128 KB
std::size_t accum_byte_size_hard_limit = 16777216; // server-imposed limit
std::size_t accum_byte_size = 0;

Expand Down Expand Up @@ -379,6 +380,7 @@ void ClientHistory::integrate_server_changesets(const SyncProgress& progress,
const RemoteChangeset* incoming_changesets,
std::size_t num_changesets, VersionInfo& version_info,
DownloadBatchState batch_state, util::Logger& logger,
util::UniqueFunction<void(const TransactionRef&)> run_in_write_tr,
SyncTransactReporter* transact_reporter)
{
REALM_ASSERT(num_changesets != 0);
Expand Down Expand Up @@ -503,6 +505,9 @@ void ClientHistory::integrate_server_changesets(const SyncProgress& progress,
if (batch_state == DownloadBatchState::LastInBatch) {
update_sync_progress(progress, downloadable_bytes, transact); // Throws
}
if (run_in_write_tr) {
run_in_write_tr(transact);
}

version_type new_version = transact->commit_and_continue_as_read().version; // Throws

Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef REALM_NOINST_CLIENT_HISTORY_IMPL_HPP
#define REALM_NOINST_CLIENT_HISTORY_IMPL_HPP

#include "realm/util/functional.hpp"
#include <realm/util/optional.hpp>
#include <realm/sync/client_base.hpp>
#include <realm/sync/history.hpp>
Expand Down Expand Up @@ -249,6 +250,7 @@ class ClientHistory final : public _impl::History, public TransformHistory {
void integrate_server_changesets(const SyncProgress& progress, const std::uint_fast64_t* downloadable_bytes,
const RemoteChangeset* changesets, std::size_t num_changesets,
VersionInfo& new_version, DownloadBatchState download_type, util::Logger&,
util::UniqueFunction<void(const TransactionRef&)> run_in_write_tr,
SyncTransactReporter* transact_reporter = nullptr);

static void get_upload_download_bytes(DB*, std::uint_fast64_t&, std::uint_fast64_t&, std::uint_fast64_t&,
Expand Down
Loading

0 comments on commit 95c4fad

Please sign in to comment.