diff --git a/CHANGELOG.md b/CHANGELOG.md index eed1cca9850..5a11b5618bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Fix SessionWrapper use-after-free crash when tearing down sessions when using session multiplexing ([#6656](https://github.com/realm/realm-core/issues/6656), since v13.9.3) ### Breaking changes * None. @@ -51,7 +51,6 @@ * Fix the query parser, it needs to copy a list of arguments and own the memory. This will prevent errors like getting a different result from a query, if the list is modified after its creation and before the execution of the query itself. In the worst case scenario, if the memory is freed before the query is executed, this could lead to crashes, especially for string and binary data types. ([#6674](https://github.com/realm/realm-core/pull/6674), since v12.5.0) * Fixed a potential crash when opening the realm after failing to download a fresh FLX realm during an automatic client reset ([#6494](https://github.com/realm/realm-core/issues/6494), since v12.3.0) - ### Breaking changes * None. diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 55dd56ec337..049c01503d4 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -292,6 +292,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac bool m_suspended = false; + // Has the SessionWrapper been finalized? + bool m_finalized = false; + // Set to true when the first DOWNLOAD message is received to indicate that // the byte-level download progress parameters can be considered reasonable // reliable. Before that, a lot of time may have passed, so our record of @@ -740,49 +743,72 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept void SessionImpl::force_close() { - m_wrapper.force_close(); + // Allow force_close() if session is active or hasn't been activated yet. + if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) { + m_wrapper.force_close(); + } } void SessionImpl::on_connection_state_changed(ConnectionState state, const util::Optional& error_info) { - m_wrapper.on_connection_state_changed(state, error_info); // Throws + // Only used to report errors back to the SyncSession while the Session is active + if (m_state == SessionImpl::Active) { + m_wrapper.on_connection_state_changed(state, error_info); // Throws + } } const std::string& SessionImpl::get_virt_path() const noexcept { + // Can only be called if the session is active or being activated + REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); return m_wrapper.m_virt_path; } const std::string& SessionImpl::get_realm_path() const noexcept { + // Can only be called if the session is active or being activated + REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); return m_wrapper.m_db->get_path(); } DBRef SessionImpl::get_db() const noexcept { + // Can only be called if the session is active or being activated + REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); return m_wrapper.m_db; } SyncTransactReporter* SessionImpl::get_transact_reporter() noexcept { + // Can only be called if the session is active or being activated + REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); return &m_wrapper; } ClientReplication& SessionImpl::access_realm() { + // Can only be called if the session is active or being activated + REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); return m_wrapper.get_replication(); } util::Optional& SessionImpl::get_client_reset_config() noexcept { + // Can only be called if the session is active or being activated + REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state); return m_wrapper.m_client_reset_config; } void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state, const SyncProgress& progress, const ReceivedChangesets& changesets) { + // Ignore the call if the session is not active + if (m_state != State::Active) { + return; + } + try { bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty()); if (simulate_integration_error) { @@ -811,36 +837,56 @@ void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_ void SessionImpl::on_upload_completion() { - m_wrapper.on_upload_completion(); // Throws + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_upload_completion(); // Throws + } } void SessionImpl::on_download_completion() { - m_wrapper.on_download_completion(); // Throws + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_download_completion(); // Throws + } } void SessionImpl::on_suspended(const SessionErrorInfo& error_info) { - m_wrapper.on_suspended(error_info); // Throws + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_suspended(error_info); // Throws + } } void SessionImpl::on_resumed() { - m_wrapper.on_resumed(); // Throws + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_resumed(); // Throws + } } void SessionImpl::handle_pending_client_reset_acknowledgement() { - m_wrapper.handle_pending_client_reset_acknowledgement(); + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.handle_pending_client_reset_acknowledgement(); + } } bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state, int64_t query_version, const ReceivedChangesets& received_changesets) { + // Ignore the call if the session is not active + if (m_state != State::Active) { + return false; + } + if (is_steady_state_download_message(batch_state, query_version)) { return false; } @@ -876,7 +922,7 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do if (hook_action == SyncClientHookAction::EarlyReturn) { return true; } - REALM_ASSERT(hook_action == SyncClientHookAction::NoAction); + REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action); if (batch_state == DownloadBatchState::MoreToCome) { return true; @@ -895,9 +941,12 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do void SessionImpl::process_pending_flx_bootstrap() { - if (!m_is_flx_sync_session) { + // Ignore the call if not a flx session or session is not active + if (!m_is_flx_sync_session || m_state != State::Active) { return; } + // Should never be called if session is not active + REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state); auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store(); if (!bootstrap_store->has_pending()) { return; @@ -952,7 +1001,7 @@ void SessionImpl::process_pending_flx_bootstrap() auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version, batch_state, pending_batch.changesets.size()); - REALM_ASSERT(action == SyncClientHookAction::NoAction); + REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action); logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version " "%3 in %4 ms. %5 changesets remaining in bootstrap", @@ -969,7 +1018,7 @@ void SessionImpl::process_pending_flx_bootstrap() auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version, DownloadBatchState::LastInBatch, changesets_processed); // NoAction/EarlyReturn are both valid no-op actions to take here. - REALM_ASSERT(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn); + REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action); } void SessionImpl::on_new_flx_subscription_set(int64_t new_version) @@ -985,31 +1034,47 @@ void SessionImpl::on_new_flx_subscription_set(int64_t new_version) void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg) { - m_wrapper.on_flx_sync_error(version, err_msg); + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_flx_sync_error(version, err_msg); + } } void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state) { - m_wrapper.on_flx_sync_progress(version, batch_state); + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_flx_sync_progress(version, batch_state); + } } SubscriptionStore* SessionImpl::get_flx_subscription_store() { + // Should never be called if session is not active + REALM_ASSERT_EX(m_state == State::Active, m_state); return m_wrapper.get_flx_subscription_store(); } MigrationStore* SessionImpl::get_migration_store() { + // Should never be called if session is not active + REALM_ASSERT_EX(m_state == State::Active, m_state); return m_wrapper.get_migration_store(); } void SessionImpl::on_flx_sync_version_complete(int64_t version) { - m_wrapper.on_flx_sync_version_complete(version); + // Ignore the call if the session is not active + if (m_state == State::Active) { + m_wrapper.on_flx_sync_version_complete(version); + } } SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data) { + // Should never be called if session is not active + REALM_ASSERT_EX(m_state == State::Active, m_state); + // Make sure we don't call the debug hook recursively. if (m_wrapper.m_in_debug_hook) { return SyncClientHookAction::NoAction; @@ -1027,7 +1092,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data err_info.server_requests_action = ProtocolErrorInfo::Action::Transient; auto err_processing_err = receive_error_message(err_info); - REALM_ASSERT(!err_processing_err); + REALM_ASSERT_EX(!err_processing_err, err_processing_err.message()); return SyncClientHookAction::EarlyReturn; } case realm::SyncClientHookAction::TriggerReconnect: { @@ -1046,6 +1111,9 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { return SyncClientHookAction::NoAction; } + if (REALM_UNLIKELY(m_state != State::Active)) { + return SyncClientHookAction::NoAction; + } SyncClientHookData data; data.event = event; @@ -1062,6 +1130,9 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { return SyncClientHookAction::NoAction; } + if (REALM_UNLIKELY(m_state != State::Active)) { + return SyncClientHookAction::NoAction; + } SyncClientHookData data; data.event = event; @@ -1076,6 +1147,8 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version) { + // Should never be called if session is not active + REALM_ASSERT_EX(m_state == State::Active, m_state); if (batch_state == DownloadBatchState::SteadyState) { return true; } @@ -1094,6 +1167,10 @@ bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_stat util::Future SessionImpl::send_test_command(std::string body) { + if (m_state != State::Active) { + return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"}; + } + try { auto json_body = nlohmann::json::parse(body.begin(), body.end()); if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) { @@ -1111,10 +1188,9 @@ util::Future SessionImpl::send_test_command(std::string body) auto pf = util::make_promise_future(); get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable { - if (status == ErrorCodes::OperationAborted) - return; - else if (!status.is_ok()) - throw Exception(status); + // Includes operation_aborted + if (!status.is_ok()) + promise.set_error(status); auto id = ++m_last_pending_test_command_ident; m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)}); @@ -1126,6 +1202,9 @@ util::Future SessionImpl::send_test_command(std::string body) // ################ SessionWrapper ################ +// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and +// provides a link to the ClientImpl::Session that creates and receives messages with the server with +// the ClientImpl::Connection that owns the ClientImpl::Session. SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr flx_sub_store, std::shared_ptr migration_store, Session::Config config) : m_client{client} @@ -1173,6 +1252,7 @@ SessionWrapper::~SessionWrapper() noexcept inline ClientReplication& SessionWrapper::get_replication() noexcept { + REALM_ASSERT(m_db); return static_cast(*m_replication); } @@ -1192,6 +1272,7 @@ void SessionWrapper::on_new_flx_subscription_set(int64_t new_version) if (!m_initiated) { return; } + REALM_ASSERT(!m_finalized); auto self = util::bind_ptr(this); m_client.post([new_version, self = std::move(self)](Status status) { @@ -1212,6 +1293,7 @@ void SessionWrapper::on_new_flx_subscription_set(int64_t new_version) void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg) { + REALM_ASSERT(!m_finalized); REALM_ASSERT(m_flx_latest_version != 0); REALM_ASSERT(m_flx_latest_version >= version); @@ -1222,6 +1304,7 @@ void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg void SessionWrapper::on_flx_sync_version_complete(int64_t version) { + REALM_ASSERT(!m_finalized); m_flx_last_seen_version = version; m_flx_active_version = version; } @@ -1231,6 +1314,7 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat if (!has_flx_subscription_store()) { return; } + REALM_ASSERT(!m_finalized); REALM_ASSERT(new_version >= m_flx_last_seen_version); REALM_ASSERT(new_version >= m_flx_active_version); REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); @@ -1271,16 +1355,19 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat SubscriptionStore* SessionWrapper::get_flx_subscription_store() { + REALM_ASSERT(!m_finalized); return m_flx_subscription_store.get(); } PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store() { + REALM_ASSERT(!m_finalized); return m_flx_pending_bootstrap_store.get(); } MigrationStore* SessionWrapper::get_migration_store() { + REALM_ASSERT(!m_finalized); return m_migration_store.get(); } @@ -1320,6 +1407,10 @@ void SessionWrapper::nonsync_transact_notify(version_type new_version) // Thread safety required REALM_ASSERT(m_initiated); + if (REALM_UNLIKELY(m_finalized || m_force_closed)) { + return; + } + util::bind_ptr self{this}; m_client.post([self = std::move(self), new_version](Status status) { if (status == ErrorCodes::OperationAborted) @@ -1342,6 +1433,10 @@ void SessionWrapper::cancel_reconnect_delay() // Thread safety required REALM_ASSERT(m_initiated); + if (REALM_UNLIKELY(m_finalized || m_force_closed)) { + return; + } + util::bind_ptr self{this}; m_client.post([self = std::move(self)](Status status) { if (status == ErrorCodes::OperationAborted) @@ -1364,6 +1459,7 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple { REALM_ASSERT(upload_completion || download_completion); REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); util::bind_ptr self{this}; m_client.post([self = std::move(self), handler = std::move(handler), upload_completion, @@ -1406,6 +1502,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() { // Thread safety required REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); std::int_fast64_t target_mark; { @@ -1449,6 +1546,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() { // Thread safety required REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); std::int_fast64_t target_mark; { @@ -1492,6 +1590,7 @@ void SessionWrapper::refresh(std::string signed_access_token) { // Thread safety required REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) { if (status == ErrorCodes::OperationAborted) @@ -1527,6 +1626,9 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) { REALM_ASSERT(!m_actualized); REALM_ASSERT(!m_sess); + // Cannot be actualized if it's already been finalized or force closed + REALM_ASSERT(!m_finalized); + REALM_ASSERT(!m_force_closed); m_db->claim_sync_agent(); auto sync_mode = endpoint.server_mode; @@ -1573,6 +1675,9 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) void SessionWrapper::force_close() { + if (m_force_closed || m_finalized) { + return; + } REALM_ASSERT(m_actualized); REALM_ASSERT(m_sess); m_force_closed = true; @@ -1582,7 +1687,11 @@ void SessionWrapper::force_close() // Delete the pending bootstrap store since it uses a reference to the logger in m_sess m_flx_pending_bootstrap_store.reset(); + // Clear the subscription and migration store refs since they are owned by SyncSession + m_flx_subscription_store.reset(); + m_migration_store.reset(); m_sess = nullptr; + // Everything is being torn down, no need to report connection state anymore m_connection_state_change_listener = {}; } @@ -1591,6 +1700,13 @@ void SessionWrapper::finalize() { REALM_ASSERT(m_actualized); + // Already finalized? + if (m_finalized) { + return; + } + + m_finalized = true; + if (!m_force_closed) { REALM_ASSERT(m_sess); ClientImpl::Connection& conn = m_sess->get_connection(); @@ -1598,6 +1714,9 @@ void SessionWrapper::finalize() // Delete the pending bootstrap store since it uses a reference to the logger in m_sess m_flx_pending_bootstrap_store.reset(); + // Clear the subscription and migration store refs since they are owned by SyncSession + m_flx_subscription_store.reset(); + m_migration_store.reset(); m_sess = nullptr; } @@ -1634,6 +1753,7 @@ void SessionWrapper::finalize() // Called with a lock on `m_client.m_mutex`. inline void SessionWrapper::finalize_before_actualization() noexcept { + REALM_ASSERT(!m_sess); m_actualized = true; m_force_closed = true; } @@ -1641,6 +1761,7 @@ inline void SessionWrapper::finalize_before_actualization() noexcept inline void SessionWrapper::report_sync_transact(VersionID old_version, VersionID new_version) { + REALM_ASSERT(!m_finalized); if (m_sync_transact_handler) m_sync_transact_handler(old_version, new_version); // Throws } @@ -1648,6 +1769,7 @@ inline void SessionWrapper::report_sync_transact(VersionID old_version, VersionI inline void SessionWrapper::on_sync_progress() { + REALM_ASSERT(!m_finalized); m_reliable_download_progress = true; report_progress(); // Throws } @@ -1655,6 +1777,7 @@ inline void SessionWrapper::on_sync_progress() void SessionWrapper::on_upload_completion() { + REALM_ASSERT(!m_finalized); while (!m_upload_completion_handlers.empty()) { auto handler = std::move(m_upload_completion_handlers.back()); m_upload_completion_handlers.pop_back(); @@ -1707,6 +1830,7 @@ void SessionWrapper::on_download_completion() void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) { + REALM_ASSERT(!m_finalized); m_suspended = true; if (m_connection_state_change_listener) { m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws @@ -1716,6 +1840,7 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) void SessionWrapper::on_resumed() { + REALM_ASSERT(!m_finalized); m_suspended = false; if (m_connection_state_change_listener) { ClientImpl::Connection& conn = m_sess->get_connection(); @@ -1740,6 +1865,7 @@ void SessionWrapper::on_connection_state_changed(ConnectionState state, void SessionWrapper::report_progress() { + REALM_ASSERT(!m_finalized); REALM_ASSERT(m_sess); if (!m_progress_handler) @@ -1775,8 +1901,7 @@ void SessionWrapper::report_progress() util::Future SessionWrapper::send_test_command(std::string body) { if (!m_sess) { - return util::Future::make_ready( - Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}); + return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}; } return m_sess->send_test_command(std::move(body)); @@ -1784,6 +1909,8 @@ util::Future SessionWrapper::send_test_command(std::string body) void SessionWrapper::handle_pending_client_reset_acknowledgement() { + REALM_ASSERT(!m_finalized); + auto pending_reset = [&] { auto ft = m_db->start_frozen(); return _impl::client_reset::has_pending_reset(ft); @@ -1865,6 +1992,7 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide , m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED , m_proxy_config{std::move(proxy_config)} // DEPRECATED , m_reconnect_info{reconnect_info} + , m_session_history{} , m_ident{ident} , m_server_endpoint{std::move(endpoint)} , m_authorization_header_name{authorization_header_name} // DEPRECATED diff --git a/src/realm/sync/client_base.hpp b/src/realm/sync/client_base.hpp index 0812483f4f3..939fc88f0a8 100644 --- a/src/realm/sync/client_base.hpp +++ b/src/realm/sync/client_base.hpp @@ -320,6 +320,19 @@ struct SessionErrorInfo : public ProtocolErrorInfo { enum class ConnectionState { disconnected, connecting, connected }; +inline std::ostream& operator<<(std::ostream& os, ConnectionState state) +{ + switch (state) { + case ConnectionState::disconnected: + return os << "Disconnected"; + case ConnectionState::connecting: + return os << "Connecting"; + case ConnectionState::connected: + return os << "Connected"; + } + REALM_TERMINATE("Invalid ConnectionState value"); +} + } // namespace realm::sync #endif // REALM_SYNC_CLIENT_BASE_HPP diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp index f16b81c7e0d..9f557a2d9e8 100644 --- a/src/realm/sync/config.hpp +++ b/src/realm/sync/config.hpp @@ -142,6 +142,21 @@ enum class SyncClientHookAction { TriggerReconnect, }; +inline std::ostream& operator<<(std::ostream& os, SyncClientHookAction action) +{ + switch (action) { + case SyncClientHookAction::NoAction: + return os << "NoAction"; + case SyncClientHookAction::EarlyReturn: + return os << "EarlyReturn"; + case SyncClientHookAction::SuspendWithRetryableError: + return os << "SuspendWithRetryableError"; + case SyncClientHookAction::TriggerReconnect: + return os << "TriggerReconnect"; + } + REALM_TERMINATE("Invalid SyncClientHookAction value"); +} + struct SyncClientHookData { SyncClientHookEvent event; sync::SyncProgress progress; diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 771bed2cbea..9bf9f38662e 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -303,6 +303,7 @@ void Connection::activate() void Connection::activate_session(std::unique_ptr sess) { + REALM_ASSERT(sess); REALM_ASSERT(&sess->m_conn == this); REALM_ASSERT(!m_force_closed); Session& sess_2 = *sess; @@ -310,6 +311,8 @@ void Connection::activate_session(std::unique_ptr sess) auto p = m_sessions.emplace(ident, std::move(sess)); // Throws bool was_inserted = p.second; REALM_ASSERT(was_inserted); + // Save the session ident to the historical list of session idents + m_session_history.insert(ident); sess_2.activate(); // Throws if (m_state == ConnectionState::connected) { bool fast_reconnect = false; @@ -321,16 +324,20 @@ void Connection::activate_session(std::unique_ptr sess) void Connection::initiate_session_deactivation(Session* sess) { + REALM_ASSERT(sess); REALM_ASSERT(&sess->m_conn == this); REALM_ASSERT(m_num_active_sessions); - if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { - if (m_activated && m_state == ConnectionState::disconnected) - m_on_idle->trigger(); - } + // Since the client may be waiting for m_num_active_sessions to reach 0 + // in stop_and_wait() (on a separate thread), deactivate Session before + // decrementing the num active sessions value. sess->initiate_deactivation(); // Throws if (sess->m_state == Session::Deactivated) { finish_session_deactivation(sess); } + if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { + if (m_activated && m_state == ConnectionState::disconnected) + m_on_idle->trigger(); + } } @@ -384,6 +391,7 @@ void ClientImpl::Connection::finish_session_deactivation(Session* sess) REALM_ASSERT(sess->m_state == Session::Deactivated); auto ident = sess->m_ident; m_sessions.erase(ident); + m_session_history.erase(ident); } void Connection::force_close() @@ -398,7 +406,7 @@ void Connection::force_close() voluntary_disconnect(); } - REALM_ASSERT(m_state == ConnectionState::disconnected); + REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state); if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) { m_reconnect_disconnect_timer.reset(); m_reconnect_delay_in_progress = false; @@ -710,7 +718,7 @@ void Connection::initiate_reconnect_wait() // Finally, deduct the time that has already passed since the last // connection attempt. milliseconds_type now = monotonic_clock_now(); - REALM_ASSERT(now >= m_reconnect_info.m_time_point); + REALM_ASSERT_3(now, >=, m_reconnect_info.m_time_point); milliseconds_type time_since_delay_start = now - m_reconnect_info.m_time_point; if (time_since_delay_start < delay) remaining_delay = delay - time_since_delay_start; @@ -857,7 +865,7 @@ void Connection::initiate_reconnect() is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix(); int min = get_oldest_supported_protocol_version(); int max = get_current_protocol_version(); - REALM_ASSERT(min <= max); + REALM_ASSERT_3(min, <=, max); // List protocol version in descending order to ensure that the server // selects the highest possible version. for (int version = max; version >= min; --version) { @@ -911,7 +919,7 @@ void Connection::handle_connect_wait(Status status) throw Exception(status); } - REALM_ASSERT(m_state == ConnectionState::connecting); + REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state); m_reconnect_info.m_reason = ConnectionTerminationReason::sync_connect_timeout; logger.info("Connect timeout"); // Throws constexpr bool try_again = true; @@ -953,7 +961,7 @@ void Connection::handle_connection_established() void Connection::schedule_urgent_ping() { - REALM_ASSERT(m_state != ConnectionState::disconnected); + REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state); if (m_ping_delay_in_progress) { m_heartbeat_timer.reset(); m_ping_delay_in_progress = false; @@ -962,7 +970,7 @@ void Connection::schedule_urgent_ping() initiate_ping_delay(now); // Throws return; } - REALM_ASSERT(m_state == ConnectionState::connecting || m_waiting_for_pong); + REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state); if (!m_send_ping) m_minimize_next_ping_delay = true; } @@ -987,7 +995,7 @@ void Connection::initiate_ping_delay(milliseconds_type now) milliseconds_type randomized_deduction = distr(m_client.get_random()); delay -= randomized_deduction; // Deduct the time spent waiting for PONG - REALM_ASSERT(now >= m_pong_wait_started_at); + REALM_ASSERT_3(now, >=, m_pong_wait_started_at); milliseconds_type spent_time = now - m_pong_wait_started_at; if (spent_time < delay) { delay -= spent_time; @@ -1094,7 +1102,7 @@ void Connection::handle_write_message() void Connection::send_next_message() { - REALM_ASSERT(m_state == ConnectionState::connected); + REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state); REALM_ASSERT(!m_sending_session); REALM_ASSERT(!m_sending); if (m_send_ping) { @@ -1105,7 +1113,7 @@ void Connection::send_next_message() // The state of being connected is not supposed to be able to change // across this loop thanks to the "no callback reentrance" guarantee // provided by Websocket::async_write_text(), and friends. - REALM_ASSERT(m_state == ConnectionState::connected); + REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state); Session& sess = *m_sessions_enlisted_to_send.front(); m_sessions_enlisted_to_send.pop_front(); @@ -1209,7 +1217,7 @@ void Connection::handle_disconnect_wait(Status status) m_disconnect_delay_in_progress = false; - REALM_ASSERT(m_state != ConnectionState::disconnected); + REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state); if (m_num_active_unsuspended_sessions == 0) { if (m_client.m_connection_linger_time > 0) logger.detail("Linger time expired"); // Throws @@ -1375,16 +1383,34 @@ void Connection::receive_pong(milliseconds_type timestamp) m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws } +Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept +{ + if (session_ident == 0) { + return nullptr; + } + + auto* sess = get_session(session_ident); + if (REALM_LIKELY(sess)) { + return sess; + } + // Check the history to see if the message received was for a previous session + if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) { + logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident); + close_due_to_protocol_error(ClientError::bad_session_ident); // Throws + } + else { + logger.error("Received %1 message for closed session, session_ident = %2", message, + session_ident); // Throws + } + return nullptr; +} void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident) { Session* sess = nullptr; if (session_ident != 0) { - sess = get_session(session_ident); + sess = find_and_validate_session(session_ident, "ERROR"); if (REALM_UNLIKELY(!sess)) { - logger.error("Bad session identifier in ERROR message, session_ident = %1", - session_ident); // Throws - close_due_to_protocol_error(ClientError::bad_session_ident); // Throws return; } std::error_code ec = sess->receive_error_message(info); // Throws @@ -1432,13 +1458,12 @@ void Connection::receive_query_error_message(int raw_error_code, std::string_vie return close_due_to_protocol_error(ClientError::bad_protocol_from_server); } - auto session = get_session(session_ident); - if (!session) { - logger.error("Bad session identifier in QUERY_ERROR mesage, session_ident = %1", session_ident); // throws - return close_due_to_protocol_error(ClientError::bad_session_ident); // throws + Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR"); + if (REALM_UNLIKELY(!sess)) { + return; } - if (auto ec = session->receive_query_error_message(raw_error_code, message, query_version)) { + if (auto ec = sess->receive_query_error_message(raw_error_code, message, query_version)) { close_due_to_protocol_error(ec); } } @@ -1446,11 +1471,8 @@ void Connection::receive_query_error_message(int raw_error_code, std::string_vie void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident) { - Session* sess = get_session(session_ident); + Session* sess = find_and_validate_session(session_ident, "IDENT"); if (REALM_UNLIKELY(!sess)) { - logger.error("Bad session identifier in IDENT message, session_ident = %1", - session_ident); // Throws - close_due_to_protocol_error(ClientError::bad_session_ident); // Throws return; } @@ -1464,11 +1486,8 @@ void Connection::receive_download_message(session_ident_type session_ident, cons DownloadBatchState batch_state, const ReceivedChangesets& received_changesets) { - Session* sess = get_session(session_ident); + Session* sess = find_and_validate_session(session_ident, "DOWNLOAD"); if (REALM_UNLIKELY(!sess)) { - logger.error("Bad session identifier in DOWNLOAD message, session_ident = %1", - session_ident); // Throws - close_due_to_protocol_error(ClientError::bad_session_ident); // Throws return; } @@ -1478,10 +1497,8 @@ void Connection::receive_download_message(session_ident_type session_ident, cons void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident) { - Session* sess = get_session(session_ident); + Session* sess = find_and_validate_session(session_ident, "MARK"); if (REALM_UNLIKELY(!sess)) { - logger.error("Bad session identifier (%1) in MARK message", session_ident); // Throws - close_due_to_protocol_error(ClientError::bad_session_ident); // Throws return; } @@ -1493,11 +1510,8 @@ void Connection::receive_mark_message(session_ident_type session_ident, request_ void Connection::receive_unbound_message(session_ident_type session_ident) { - Session* sess = get_session(session_ident); + Session* sess = find_and_validate_session(session_ident, "UNBOUND"); if (REALM_UNLIKELY(!sess)) { - logger.error("Bad session identifier in UNBOUND message, session_ident = %1", - session_ident); // Throws - close_due_to_protocol_error(ClientError::bad_session_ident); // Throws return; } @@ -1516,11 +1530,8 @@ void Connection::receive_unbound_message(session_ident_type session_ident) void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident, std::string_view body) { - Session* sess = get_session(session_ident); + Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND"); if (REALM_UNLIKELY(!sess)) { - logger.error("Bad session identifier in TEST_COMMAND response message, session_ident = %1", - session_ident); // Throws - close_due_to_protocol_error(ClientError::bad_session_ident); // Throws return; } @@ -1571,7 +1582,7 @@ void Connection::handle_protocol_error(ClientProtocol::Error error) // state. void Connection::enlist_to_send(Session* sess) { - REALM_ASSERT(m_state == ConnectionState::connected); + REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state); m_sessions_enlisted_to_send.push_back(sess); // Throws if (!m_sending) send_next_message(); // Throws @@ -1589,7 +1600,7 @@ std::string Connection::get_active_appservices_connection_id() void Session::cancel_resumption_delay() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); if (!m_suspended) return; @@ -1626,7 +1637,7 @@ void Session::gather_pending_compensating_writes(util::Span changeset m_pending_compensating_write_errors.front().compensating_write_server_version <= changesets.back().version) { auto& cur_error = m_pending_compensating_write_errors.front(); - REALM_ASSERT(cur_error.compensating_write_server_version >= changesets.front().version); + REALM_ASSERT_3(cur_error.compensating_write_server_version, >=, changesets.front().version); out->push_back(std::move(cur_error)); m_pending_compensating_write_errors.pop_front(); } @@ -1683,7 +1694,7 @@ void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& void Session::on_integration_failure(const IntegrationException& error) { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT(!m_client_error && !m_error_to_send); logger.error("Failed to integrate downloaded changesets: %1", error.what()); @@ -1706,8 +1717,8 @@ void Session::on_integration_failure(const IntegrationException& error) void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress) { - REALM_ASSERT(m_state == Active); - REALM_ASSERT(progress.download.server_version >= m_download_progress.server_version); + REALM_ASSERT_EX(m_state == Active, m_state); + REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version); m_download_progress = progress.download; bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version); m_progress = progress; @@ -1741,7 +1752,7 @@ void Session::on_changesets_integrated(version_type client_version, const SyncPr Session::~Session() { - // REALM_ASSERT(m_state == Unactivated || m_state == Deactivated); + // REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state); } @@ -1756,7 +1767,7 @@ std::string Session::make_logger_prefix(session_ident_type ident) void Session::activate() { - REALM_ASSERT(m_state == Unactivated); + REALM_ASSERT_EX(m_state == Unactivated, m_state); logger.debug("Activating"); // Throws @@ -1798,7 +1809,7 @@ void Session::activate() m_upload_progress = m_progress.upload; m_last_version_selected_for_upload = m_upload_progress.client_version; m_download_progress = m_progress.download; - REALM_ASSERT(m_last_version_available >= m_progress.upload.client_version); + REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version); logger.debug("last_version_available = %1", m_last_version_available); // Throws logger.debug("progress_server_version = %1", m_progress.download.server_version); // Throws @@ -1831,7 +1842,7 @@ void Session::activate() // deactivated upon return. void Session::initiate_deactivation() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); logger.debug("Initiating deactivation"); // Throws @@ -1864,7 +1875,7 @@ void Session::initiate_deactivation() void Session::complete_deactivation() { - REALM_ASSERT(m_state == Deactivating); + REALM_ASSERT_EX(m_state == Deactivating, m_state); m_state = Deactivated; logger.debug("Deactivation completed"); // Throws @@ -1878,7 +1889,7 @@ void Session::complete_deactivation() // deactivated upon return. void Session::send_message() { - REALM_ASSERT(m_state == Active || m_state == Deactivating); + REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); REALM_ASSERT(m_enlisted_to_send); m_enlisted_to_send = false; if (m_state == Deactivating || m_error_message_received || m_suspended) { @@ -1975,8 +1986,8 @@ void Session::send_message() return send_query_change_message(); // throws } - REALM_ASSERT(m_upload_progress.client_version <= m_upload_target_version); - REALM_ASSERT(m_upload_target_version <= m_last_version_available); + REALM_ASSERT_3(m_upload_progress.client_version, <=, m_upload_target_version); + REALM_ASSERT_3(m_upload_target_version, <=, m_last_version_available); if (m_allow_upload && (m_upload_target_version > m_upload_progress.client_version)) { return send_upload_message(); // Throws } @@ -1985,7 +1996,7 @@ void Session::send_message() void Session::send_bind_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); session_ident_type session_ident = m_ident; bool need_client_file_ident = !have_client_file_ident(); @@ -2008,7 +2019,7 @@ void Session::send_bind_message() json_data_dump = bind_json_data.dump(); } logger.debug( - "Sending: BIND(session_ident=%1, need_client_file_ident=%2 is_subserver=%3 json_data=\"%4\")", + "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")", session_ident, need_client_file_ident, is_subserver, json_data_dump); } protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token, @@ -2016,7 +2027,7 @@ void Session::send_bind_message() } else { std::string server_path = get_virt_path(); - logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2 is_subserver=%3 server_path=%4)", + logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)", session_ident, need_client_file_ident, is_subserver, server_path); protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token, need_client_file_ident, is_subserver); // Throws @@ -2034,7 +2045,7 @@ void Session::send_bind_message() void Session::send_ident_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT(m_bind_message_sent); REALM_ASSERT(!m_unbind_message_sent); REALM_ASSERT(have_client_file_ident()); @@ -2049,7 +2060,7 @@ void Session::send_ident_message() const auto active_query_body = active_query_set.to_ext_json(); logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, " "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, " - "latest_server_version_salt=%6, query_version: %7 query_size: %8, query: \"%9\")", + "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")", m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version, m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version, m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(), @@ -2077,11 +2088,11 @@ void Session::send_ident_message() void Session::send_query_change_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT(m_ident_message_sent); REALM_ASSERT(!m_unbind_message_sent); REALM_ASSERT(m_pending_flx_sub_set); - REALM_ASSERT(m_pending_flx_sub_set->query_version > m_last_sent_flx_query_version); + REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version); if (REALM_UNLIKELY(get_client().is_dry_run())) { return; @@ -2106,10 +2117,10 @@ void Session::send_query_change_message() void Session::send_upload_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT(m_ident_message_sent); REALM_ASSERT(!m_unbind_message_sent); - REALM_ASSERT(m_upload_target_version > m_upload_progress.client_version); + REALM_ASSERT_3(m_upload_target_version, >, m_upload_progress.client_version); if (REALM_UNLIKELY(get_client().is_dry_run())) return; @@ -2231,10 +2242,10 @@ void Session::send_upload_message() void Session::send_mark_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT(m_ident_message_sent); REALM_ASSERT(!m_unbind_message_sent); - REALM_ASSERT(m_target_download_mark > m_last_download_mark_sent); + REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent); request_ident_type request_ident = m_target_download_mark; logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws @@ -2254,7 +2265,7 @@ void Session::send_mark_message() void Session::send_unbind_message() { - REALM_ASSERT(m_state == Deactivating || m_error_message_received || m_suspended); + REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state); REALM_ASSERT(m_bind_message_sent); REALM_ASSERT(!m_unbind_message_sent); @@ -2272,7 +2283,7 @@ void Session::send_unbind_message() void Session::send_json_error_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT(m_ident_message_sent); REALM_ASSERT(!m_unbind_message_sent); REALM_ASSERT(m_error_to_send); @@ -2301,7 +2312,7 @@ void Session::send_json_error_message() void Session::send_test_command_message() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(), [](const PendingTestCommand& command) { @@ -2329,8 +2340,8 @@ std::error_code Session::receive_ident_message(SaltedFileIdent client_file_ident client_file_ident.salt); // Throws // Ignore the message if the deactivation process has been initiated, - // because in that case, the associated Realm must not be accessed any - // longer. + // because in that case, the associated Realm and SessionWrapper must + // not be accessed any longer. if (m_state != Active) return std::error_code{}; // Success @@ -2390,10 +2401,8 @@ std::error_code Session::receive_ident_message(SaltedFileIdent client_file_ident bool has_pending_client_reset = false; repl.get_history().get_status(m_last_version_available, client_file_ident, m_progress, &has_pending_client_reset); // Throws - REALM_ASSERT_EX(m_client_file_ident.ident == client_file_ident.ident, m_client_file_ident.ident, - client_file_ident.ident); - REALM_ASSERT_EX(m_client_file_ident.salt == client_file_ident.salt, m_client_file_ident.salt, - client_file_ident.salt); + REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident); + REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt); REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0, m_progress.download.last_integrated_client_version); REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version); @@ -2453,8 +2462,8 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f const ReceivedChangesets& received_changesets) { // Ignore the message if the deactivation process has been initiated, - // because in that case, the associated Realm must not be accessed any - // longer. + // because in that case, the associated Realm and SessionWrapper must + // not be accessed any longer. if (m_state != Active) return; @@ -2535,7 +2544,7 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f if (hook_action == SyncClientHookAction::EarlyReturn) { return; } - REALM_ASSERT(hook_action == SyncClientHookAction::NoAction); + REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action); if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) { clear_resumption_delay_state(); @@ -2549,7 +2558,7 @@ void Session::receive_download_message(const SyncProgress& progress, std::uint_f if (hook_action == SyncClientHookAction::EarlyReturn) { return; } - REALM_ASSERT(hook_action == SyncClientHookAction::NoAction); + REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action); // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect // after a retryable session error. @@ -2561,8 +2570,8 @@ std::error_code Session::receive_mark_message(request_ident_type request_ident) logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws // Ignore the message if the deactivation process has been initiated, - // because in that case, the associated Realm must not be accessed any - // longer. + // because in that case, the associated Realm and SessionWrapper must + // not be accessed any longer. if (m_state != Active) return std::error_code{}; // Success @@ -2602,7 +2611,7 @@ std::error_code Session::receive_unbound_message() // not been received, implies that the deactivation process must have been // initiated, so this session must be in the Deactivating state or the session // has been suspended because of a client side error. - REALM_ASSERT(m_state == Deactivating || m_suspended); + REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state); m_unbound_message_received = true; @@ -2621,7 +2630,12 @@ std::error_code Session::receive_unbound_message() std::error_code Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version) { logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version); - on_flx_sync_error(query_version, std::string_view(message.data(), message.size())); // throws + // Ignore the message if the deactivation process has been initiated, + // because in that case, the associated Realm and SessionWrapper must + // not be accessed any longer. + if (m_state == Active) { + on_flx_sync_error(query_version, std::string_view(message.data(), message.size())); // throws + } return {}; } @@ -2649,14 +2663,23 @@ std::error_code Session::receive_error_message(const ProtocolErrorInfo& info) return ClientError::bad_error_code; } - auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info); - if (debug_action == SyncClientHookAction::EarlyReturn) { - return {}; + // Can't process debug hook actions once the Session is undergoing deactivation, since + // the SessionWrapper may not be available + if (m_state == Active) { + auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info); + if (debug_action == SyncClientHookAction::EarlyReturn) { + return {}; + } } + // For compensating write errors, we need to defer raising them to the SDK until after the server version // containing the compensating write has appeared in a download message. if (error_code == ProtocolError::compensating_write) { - m_pending_compensating_write_errors.push_back(info); + // If the client is not active, the compensating writes will not be processed now, but will be + // sent again the next time the client connects + if (m_state == Active) { + m_pending_compensating_write_errors.push_back(info); + } return {}; } @@ -2668,7 +2691,7 @@ std::error_code Session::receive_error_message(const ProtocolErrorInfo& info) void Session::suspend(const SessionErrorInfo& info) { REALM_ASSERT(!m_suspended); - REALM_ASSERT(m_state == Active || m_state == Deactivating); + REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); logger.debug("Suspended"); // Throws m_suspended = true; @@ -2678,7 +2701,7 @@ void Session::suspend(const SessionErrorInfo& info) // The fact that the UNBIND message has been sent, but we are not being suspended because // we received an ERROR message implies that the deactivation process must // have been initiated, so this session must be in the Deactivating state. - REALM_ASSERT(m_state == Deactivating); + REALM_ASSERT_EX(m_state == Deactivating, m_state); // The deactivation process completes when the unbinding process // completes. @@ -2804,7 +2827,7 @@ bool ClientImpl::Session::check_received_sync_progress(const SyncProgress& progr void Session::check_for_upload_completion() { - REALM_ASSERT(m_state == Active); + REALM_ASSERT_EX(m_state == Active, m_state); if (!m_upload_completion_notification_requested) { return; } @@ -2814,13 +2837,13 @@ void Session::check_for_upload_completion() return; // Upload process must have reached end of history - REALM_ASSERT(m_upload_progress.client_version <= m_last_version_available); + REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available); bool scan_complete = (m_upload_progress.client_version == m_last_version_available); if (!scan_complete) return; // All uploaded changesets must have been acknowledged by the server - REALM_ASSERT(m_progress.upload.client_version <= m_last_version_selected_for_upload); + REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload); bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload); if (!all_uploads_accepted) return; @@ -2832,8 +2855,8 @@ void Session::check_for_upload_completion() void Session::check_for_download_completion() { - REALM_ASSERT(m_target_download_mark >= m_last_download_mark_received); - REALM_ASSERT(m_last_download_mark_received >= m_last_triggering_download_mark); + REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received); + REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark); if (m_last_download_mark_received == m_last_triggering_download_mark) return; if (m_last_download_mark_received < m_target_download_mark) diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index 37df91a5d5f..26423c01fe1 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -535,6 +535,7 @@ class ClientImpl::Connection { OutputBuffer& get_output_buffer() noexcept; Session* get_session(session_ident_type) const noexcept; + Session* find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept; static bool was_voluntary(ConnectionTerminationReason) noexcept; static std::string make_logger_prefix(connection_ident_type); @@ -628,6 +629,9 @@ class ClientImpl::Connection { // The set of sessions associated with this connection. A session becomes // associated with a connection when it is activated. std::map> m_sessions; + // Keep track of previously used sessions idents to see if a stale message was + // received for a closed session + std::unordered_set m_session_history; // A queue of sessions that have enlisted for an opportunity to send a // message to the server. Sessions will be served in the order that they diff --git a/src/realm/sync/noinst/protocol_codec.hpp b/src/realm/sync/noinst/protocol_codec.hpp index 25d3bb6a25e..35d68d5c1b9 100644 --- a/src/realm/sync/noinst/protocol_codec.hpp +++ b/src/realm/sync/noinst/protocol_codec.hpp @@ -431,9 +431,9 @@ class ClientProtocol { msg = HeaderLineParser(std::string_view(uncompressed_body_buffer.get(), uncompressed_body_size)); } - logger.trace("Download message compression: is_body_compressed = %1, " - "compressed_body_size=%2, uncompressed_body_size=%3", - is_body_compressed, compressed_body_size, uncompressed_body_size); + logger.debug("Download message compression: session_ident=%1, is_body_compressed=%2, " + "compressed_body_size=%3, uncompressed_body_size=%4", + session_ident, is_body_compressed, compressed_body_size, uncompressed_body_size); ReceivedChangesets received_changesets; @@ -456,10 +456,10 @@ class ClientProtocol { "Server version in downloaded changeset cannot be zero"); } auto changeset_data = msg.read_sized_data(changeset_size); - logger.debug("Received: DOWNLOAD CHANGESET(server_version=%1, " - "client_version=%2, origin_timestamp=%3, origin_file_ident=%4, " - "original_changeset_size=%5, changeset_size=%6)", - cur_changeset.remote_version, cur_changeset.last_integrated_local_version, + logger.debug("Received: DOWNLOAD CHANGESET(session_ident=%1, server_version=%2, " + "client_version=%3, origin_timestamp=%4, origin_file_ident=%5, " + "original_changeset_size=%6, changeset_size=%7)", + session_ident, cur_changeset.remote_version, cur_changeset.last_integrated_local_version, cur_changeset.origin_timestamp, cur_changeset.origin_file_ident, cur_changeset.original_changeset_size, changeset_size); // Throws if (logger.would_log(util::Logger::Level::trace)) { diff --git a/test/sync_fixtures.hpp b/test/sync_fixtures.hpp index 593c1d785c8..31a2b800e0e 100644 --- a/test/sync_fixtures.hpp +++ b/test/sync_fixtures.hpp @@ -440,7 +440,11 @@ class MultiClientServerFixture { size_t max_download_size = 0x1000000; // 16 MB as in Server::Config +#if REALM_DISABLE_SYNC_MULTIPLEXING + bool one_connection_per_session = true; +#else bool one_connection_per_session = false; +#endif bool disable_upload_activation_delay = false; diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 074eb8f4b92..465cb0b566c 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -1468,7 +1468,8 @@ TEST(Sync_Randomized) for (size_t i = 1; i < num_clients; ++i) { ReadTransaction rt(client_shared_groups[i]); rt.get_group().verify(); - CHECK(compare_groups(rt_0, rt)); + // Logger is guaranteed to be defined + CHECK(compare_groups(rt_0, rt, *test_context.logger)); } } @@ -3696,6 +3697,7 @@ TEST(Sync_UploadDownloadProgress_6) session_config.signed_user_token = g_signed_test_user_token; std::mutex mutex; + std::condition_variable session_cv; auto session = std::make_unique(client, db, nullptr, nullptr, std::move(session_config)); auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, @@ -3709,20 +3711,78 @@ TEST(Sync_UploadDownloadProgress_6) CHECK_EQUAL(snapshot_version, 1); std::lock_guard lock{mutex}; session.reset(); + session_cv.notify_one(); }; session->set_progress_handler(progress_handler); { - std::lock_guard lock{mutex}; + std::unique_lock lock{mutex}; session->bind(); + // Wait until the progress handler is called on the session before tearing down the client + session_cv.wait_for(lock, std::chrono::seconds(30), [&session]() { + return !bool(session); + }); } client.shutdown_and_wait(); server.stop(); server_thread.join(); - // The check is that we reach this point without deadlocking. + // The check is that we reach this point without deadlocking or throwing an assert while tearing + // down the active session +} + +// This test has a single client starting to connect to the server with one session. +// The client is torn down immediately after bind is called on the session. +// The session will still be active and has an unactualized session wrapper when the +// client is torn down, which leads to both calls to finalize_before_actualization() and +// and finalize(). +TEST(Sync_UploadDownloadProgress_7) +{ + TEST_DIR(server_dir); + TEST_CLIENT_DB(db); + + Server::Config server_config; + server_config.logger = std::make_shared("Server: ", test_context.logger); + server_config.listen_address = "localhost"; + server_config.listen_port = ""; + server_config.tcp_no_delay = true; + + util::Optional public_key = PKey::load_public(test_server_key_path()); + Server server(server_dir, std::move(public_key), server_config); + server.start(); + + auto server_port = server.listen_endpoint().port(); + + ThreadWrapper server_thread; + server_thread.start([&] { + server.run(); + }); + + Client::Config client_config; + client_config.logger = std::make_shared("Client: ", test_context.logger); + auto socket_provider = std::make_shared(client_config.logger, ""); + client_config.socket_provider = socket_provider; + client_config.reconnect_mode = ReconnectMode::testing; + client_config.one_connection_per_session = false; + Client client(client_config); + + Session::Config session_config; + session_config.server_address = "localhost"; + session_config.server_port = server_port; + session_config.realm_identifier = "/test"; + session_config.signed_user_token = g_signed_test_user_token; + + auto session = std::make_unique(client, db, nullptr, nullptr, std::move(session_config)); + session->bind(); + + client.shutdown_and_wait(); + server.stop(); + server_thread.join(); + + // The check is that we reach this point without deadlocking or throwing an assert while tearing + // down the session that is in the process of being created. } // Commenting out test for now and will be fixed in a later PR