Skip to content

Commit

Permalink
Added output to ASSERTS and moved session history to unordered_set
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Wilkerson-Barker committed Jun 12, 2023
1 parent aca0feb commit 011a17d
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 138 deletions.
134 changes: 72 additions & 62 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ void SessionImpl::force_close()
void SessionImpl::on_connection_state_changed(ConnectionState state,
const util::Optional<SessionErrorInfo>& error_info)
{
// Only used to report errors back to the SyncSession while the Session is active
if (m_state == SessionImpl::Active) {
m_wrapper.on_connection_state_changed(state, error_info); // Throws
}
Expand All @@ -761,42 +762,42 @@ void SessionImpl::on_connection_state_changed(ConnectionState state,
const std::string& SessionImpl::get_virt_path() const noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT(m_state == State::Active || m_state == State::Unactivated);
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.m_virt_path;
}

const std::string& SessionImpl::get_realm_path() const noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT(m_state == State::Active || m_state == State::Unactivated);
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.m_db->get_path();
}

DBRef SessionImpl::get_db() const noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT(m_state == State::Active || m_state == State::Unactivated);
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.m_db;
}

SyncTransactReporter* SessionImpl::get_transact_reporter() noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT(m_state == State::Active || m_state == State::Unactivated);
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return &m_wrapper;
}

ClientReplication& SessionImpl::access_realm()
{
// Can only be called if the session is active or being activated
REALM_ASSERT(m_state == State::Active || m_state == State::Unactivated);
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.get_replication();
}

util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT(m_state == State::Active || m_state == State::Unactivated);
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.m_client_reset_config;
}

Expand Down Expand Up @@ -921,7 +922,7 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do
if (hook_action == SyncClientHookAction::EarlyReturn) {
return true;
}
REALM_ASSERT(hook_action == SyncClientHookAction::NoAction);
REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);

if (batch_state == DownloadBatchState::MoreToCome) {
return true;
Expand All @@ -944,7 +945,8 @@ void SessionImpl::process_pending_flx_bootstrap()
if (!m_is_flx_sync_session || m_state != State::Active) {
return;
}
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active
// Should never be called if session is not active
REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
if (!bootstrap_store->has_pending()) {
return;
Expand Down Expand Up @@ -999,7 +1001,7 @@ void SessionImpl::process_pending_flx_bootstrap()

auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
batch_state, pending_batch.changesets.size());
REALM_ASSERT(action == SyncClientHookAction::NoAction);
REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);

logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
"%3 in %4 ms. %5 changesets remaining in bootstrap",
Expand All @@ -1016,7 +1018,7 @@ void SessionImpl::process_pending_flx_bootstrap()
auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
DownloadBatchState::LastInBatch, changesets_processed);
// NoAction/EarlyReturn are both valid no-op actions to take here.
REALM_ASSERT(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn);
REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
}

void SessionImpl::on_new_flx_subscription_set(int64_t new_version)
Expand Down Expand Up @@ -1049,14 +1051,14 @@ void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch
SubscriptionStore* SessionImpl::get_flx_subscription_store()
{
// Should never be called if session is not active
REALM_ASSERT(m_state == State::Active);
REALM_ASSERT_EX(m_state == State::Active, m_state);
return m_wrapper.get_flx_subscription_store();
}

MigrationStore* SessionImpl::get_migration_store()
{
// Should never be called if session is not active
REALM_ASSERT(m_state == State::Active);
REALM_ASSERT_EX(m_state == State::Active, m_state);
return m_wrapper.get_migration_store();
}

Expand All @@ -1071,7 +1073,7 @@ void SessionImpl::on_flx_sync_version_complete(int64_t version)
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
{
// Should never be called if session is not active
REALM_ASSERT(m_state == State::Active);
REALM_ASSERT_EX(m_state == State::Active, m_state);

// Make sure we don't call the debug hook recursively.
if (m_wrapper.m_in_debug_hook) {
Expand All @@ -1090,7 +1092,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data
err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;

auto err_processing_err = receive_error_message(err_info);
REALM_ASSERT(!err_processing_err);
REALM_ASSERT_EX(!err_processing_err, err_processing_err.message());
return SyncClientHookAction::EarlyReturn;
}
case realm::SyncClientHookAction::TriggerReconnect: {
Expand Down Expand Up @@ -1146,7 +1148,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
{
// Should never be called if session is not active
REALM_ASSERT(m_state == State::Active);
REALM_ASSERT_EX(m_state == State::Active, m_state);
if (batch_state == DownloadBatchState::SteadyState) {
return true;
}
Expand Down Expand Up @@ -1200,6 +1202,9 @@ util::Future<std::string> SessionImpl::send_test_command(std::string body)

// ################ SessionWrapper ################

// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
// the ClientImpl::Connection that owns the ClientImpl::Session.
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
std::shared_ptr<MigrationStore> migration_store, Session::Config config)
: m_client{client}
Expand Down Expand Up @@ -1402,22 +1407,24 @@ void SessionWrapper::nonsync_transact_notify(version_type new_version)
// Thread safety required
REALM_ASSERT(m_initiated);

if (REALM_LIKELY(!m_finalized && !m_force_closed)) {
util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self), new_version](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

REALM_ASSERT(self->m_actualized);
if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
sess.recognize_sync_version(new_version); // Throws
self->report_progress(); // Throws
}); // Throws
if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
return;
}

util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self), new_version](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

REALM_ASSERT(self->m_actualized);
if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
sess.recognize_sync_version(new_version); // Throws
self->report_progress(); // Throws
}); // Throws
}


Expand All @@ -1426,23 +1433,25 @@ void SessionWrapper::cancel_reconnect_delay()
// Thread safety required
REALM_ASSERT(m_initiated);

if (REALM_LIKELY(!m_finalized && !m_force_closed)) {
util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self)](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

REALM_ASSERT(self->m_actualized);
if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
sess.cancel_resumption_delay(); // Throws
ClientImpl::Connection& conn = sess.get_connection();
conn.cancel_reconnect_delay(); // Throws
}); // Throws
if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
return;
}

util::bind_ptr<SessionWrapper> self{this};
m_client.post([self = std::move(self)](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

REALM_ASSERT(self->m_actualized);
if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
sess.cancel_resumption_delay(); // Throws
ClientImpl::Connection& conn = sess.get_connection();
conn.cancel_reconnect_delay(); // Throws
}); // Throws
}

void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
Expand Down Expand Up @@ -1666,23 +1675,24 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)

void SessionWrapper::force_close()
{
if (!m_force_closed && !m_finalized) {
REALM_ASSERT(m_actualized);
REALM_ASSERT(m_sess);
m_force_closed = true;

ClientImpl::Connection& conn = m_sess->get_connection();
conn.initiate_session_deactivation(m_sess); // Throws

// Delete the pending bootstrap store since it uses a reference to the logger in m_sess
m_flx_pending_bootstrap_store.reset();
// Clear the subscription and migration store refs since they are owned by SyncSession
m_flx_subscription_store.reset();
m_migration_store.reset();
m_sess = nullptr;
// Everything is being torn down, no need to report connection state anymore
m_connection_state_change_listener = {};
if (m_force_closed || m_finalized) {
return;
}
REALM_ASSERT(m_actualized);
REALM_ASSERT(m_sess);
m_force_closed = true;

ClientImpl::Connection& conn = m_sess->get_connection();
conn.initiate_session_deactivation(m_sess); // Throws

// Delete the pending bootstrap store since it uses a reference to the logger in m_sess
m_flx_pending_bootstrap_store.reset();
// Clear the subscription and migration store refs since they are owned by SyncSession
m_flx_subscription_store.reset();
m_migration_store.reset();
m_sess = nullptr;
// Everything is being torn down, no need to report connection state anymore
m_connection_state_change_listener = {};
}

// Must be called from event loop thread
Expand Down
13 changes: 13 additions & 0 deletions src/realm/sync/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,19 @@ struct SessionErrorInfo : public ProtocolErrorInfo {

enum class ConnectionState { disconnected, connecting, connected };

inline std::ostream& operator<<(std::ostream& os, ConnectionState state)
{
switch (state) {
case ConnectionState::disconnected:
return os << "Disconnected";
case ConnectionState::connecting:
return os << "Connecting";
case ConnectionState::connected:
return os << "Connected";
}
REALM_TERMINATE("Invalid ConnectionState value");
}

} // namespace realm::sync

#endif // REALM_SYNC_CLIENT_BASE_HPP
15 changes: 15 additions & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ enum class SyncClientHookAction {
TriggerReconnect,
};

inline std::ostream& operator<<(std::ostream& os, SyncClientHookAction action)
{
switch (action) {
case SyncClientHookAction::NoAction:
return os << "NoAction";
case SyncClientHookAction::EarlyReturn:
return os << "EarlyReturn";
case SyncClientHookAction::SuspendWithRetryableError:
return os << "SuspendWithRetryableError";
case SyncClientHookAction::TriggerReconnect:
return os << "TriggerReconnect";
}
REALM_TERMINATE("Invalid SyncClientHookAction value");
}

struct SyncClientHookData {
SyncClientHookEvent event;
sync::SyncProgress progress;
Expand Down
Loading

0 comments on commit 011a17d

Please sign in to comment.