-
Notifications
You must be signed in to change notification settings - Fork 168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix SessionWrapper use-after-free crash when tearing down sessions #6676
Changes from 8 commits
c7f7899
07cf26c
e70045b
e265d80
31317d5
8c0a1c1
5fe095a
253f951
68a1498
f2fee00
356b169
3781625
117e31a
5724ea4
5075ead
6352e17
979fb56
1e3c591
aca0feb
011a17d
3f6cca0
0ebefaf
f2caa2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -292,6 +292,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac | |
|
||
bool m_suspended = false; | ||
|
||
// Has the SessionWrapper been finalized? | ||
bool m_finalized = false; | ||
|
||
// Set to true when the first DOWNLOAD message is received to indicate that | ||
// the byte-level download progress parameters can be considered reasonable | ||
// reliable. Before that, a lot of time may have passed, so our record of | ||
|
@@ -746,7 +749,9 @@ void SessionImpl::force_close() | |
void SessionImpl::on_connection_state_changed(ConnectionState state, | ||
const util::Optional<SessionErrorInfo>& error_info) | ||
{ | ||
m_wrapper.on_connection_state_changed(state, error_info); // Throws | ||
if (m_state == SessionImpl::Active) { | ||
m_wrapper.on_connection_state_changed(state, error_info); // Throws | ||
} | ||
} | ||
|
||
|
||
|
@@ -783,6 +788,7 @@ util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept | |
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state, | ||
const SyncProgress& progress, const ReceivedChangesets& changesets) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
try { | ||
bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty()); | ||
if (simulate_integration_error) { | ||
|
@@ -811,13 +817,15 @@ void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_ | |
|
||
void SessionImpl::on_upload_completion() | ||
{ | ||
m_wrapper.on_upload_completion(); // Throws | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
m_wrapper.on_upload_completion(); // Throws | ||
} | ||
|
||
|
||
void SessionImpl::on_download_completion() | ||
{ | ||
m_wrapper.on_download_completion(); // Throws | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
m_wrapper.on_download_completion(); // Throws | ||
} | ||
|
||
|
||
|
@@ -834,13 +842,16 @@ void SessionImpl::on_resumed() | |
|
||
void SessionImpl::handle_pending_client_reset_acknowledgement() | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
m_wrapper.handle_pending_client_reset_acknowledgement(); | ||
} | ||
|
||
|
||
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state, | ||
int64_t query_version, const ReceivedChangesets& received_changesets) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
|
||
if (is_steady_state_download_message(batch_state, query_version)) { | ||
return false; | ||
} | ||
|
@@ -898,6 +909,7 @@ void SessionImpl::process_pending_flx_bootstrap() | |
if (!m_is_flx_sync_session) { | ||
return; | ||
} | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store(); | ||
if (!bootstrap_store->has_pending()) { | ||
return; | ||
|
@@ -985,31 +997,37 @@ void SessionImpl::on_new_flx_subscription_set(int64_t new_version) | |
|
||
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
m_wrapper.on_flx_sync_error(version, err_msg); | ||
} | ||
|
||
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
m_wrapper.on_flx_sync_progress(version, batch_state); | ||
} | ||
|
||
SubscriptionStore* SessionImpl::get_flx_subscription_store() | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
return m_wrapper.get_flx_subscription_store(); | ||
} | ||
|
||
MigrationStore* SessionImpl::get_migration_store() | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
return m_wrapper.get_migration_store(); | ||
} | ||
|
||
void SessionImpl::on_flx_sync_version_complete(int64_t version) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
m_wrapper.on_flx_sync_version_complete(version); | ||
} | ||
|
||
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
// Make sure we don't call the debug hook recursively. | ||
if (m_wrapper.m_in_debug_hook) { | ||
return SyncClientHookAction::NoAction; | ||
|
@@ -1043,6 +1061,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con | |
int64_t query_version, DownloadBatchState batch_state, | ||
size_t num_changesets) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { | ||
return SyncClientHookAction::NoAction; | ||
} | ||
|
@@ -1059,6 +1078,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con | |
|
||
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { | ||
return SyncClientHookAction::NoAction; | ||
} | ||
|
@@ -1076,6 +1096,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con | |
|
||
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version) | ||
{ | ||
REALM_ASSERT(m_state == SessionImpl::Active); // Should never be called if session is not active | ||
if (batch_state == DownloadBatchState::SteadyState) { | ||
return true; | ||
} | ||
|
@@ -1108,13 +1129,16 @@ util::Future<std::string> SessionImpl::send_test_command(std::string body) | |
return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())}; | ||
} | ||
|
||
if (m_state != Active) { | ||
return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"}; | ||
} | ||
|
||
auto pf = util::make_promise_future<std::string>(); | ||
|
||
get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable { | ||
if (status == ErrorCodes::OperationAborted) | ||
return; | ||
else if (!status.is_ok()) | ||
throw Exception(status); | ||
// Includes operation_aborted | ||
if (!status.is_ok()) | ||
promise.set_error(status); | ||
|
||
auto id = ++m_last_pending_test_command_ident; | ||
m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)}); | ||
|
@@ -1173,6 +1197,7 @@ SessionWrapper::~SessionWrapper() noexcept | |
|
||
inline ClientReplication& SessionWrapper::get_replication() noexcept | ||
{ | ||
REALM_ASSERT(m_db); | ||
return static_cast<ClientReplication&>(*m_replication); | ||
} | ||
|
||
|
@@ -1192,6 +1217,7 @@ void SessionWrapper::on_new_flx_subscription_set(int64_t new_version) | |
if (!m_initiated) { | ||
return; | ||
} | ||
REALM_ASSERT(!m_finalized); | ||
|
||
auto self = util::bind_ptr<SessionWrapper>(this); | ||
m_client.post([new_version, self = std::move(self)](Status status) { | ||
|
@@ -1212,6 +1238,7 @@ void SessionWrapper::on_new_flx_subscription_set(int64_t new_version) | |
|
||
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg) | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(m_flx_latest_version != 0); | ||
REALM_ASSERT(m_flx_latest_version >= version); | ||
|
||
|
@@ -1222,6 +1249,7 @@ void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg | |
|
||
void SessionWrapper::on_flx_sync_version_complete(int64_t version) | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
m_flx_last_seen_version = version; | ||
m_flx_active_version = version; | ||
} | ||
|
@@ -1231,6 +1259,7 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat | |
if (!has_flx_subscription_store()) { | ||
return; | ||
} | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(new_version >= m_flx_last_seen_version); | ||
REALM_ASSERT(new_version >= m_flx_active_version); | ||
REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); | ||
|
@@ -1271,16 +1300,19 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat | |
|
||
SubscriptionStore* SessionWrapper::get_flx_subscription_store() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
return m_flx_subscription_store.get(); | ||
} | ||
|
||
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
return m_flx_pending_bootstrap_store.get(); | ||
} | ||
|
||
MigrationStore* SessionWrapper::get_migration_store() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
return m_migration_store.get(); | ||
} | ||
|
||
|
@@ -1319,6 +1351,7 @@ void SessionWrapper::nonsync_transact_notify(version_type new_version) | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
util::bind_ptr<SessionWrapper> self{this}; | ||
m_client.post([self = std::move(self), new_version](Status status) { | ||
|
@@ -1341,6 +1374,7 @@ void SessionWrapper::cancel_reconnect_delay() | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
util::bind_ptr<SessionWrapper> self{this}; | ||
m_client.post([self = std::move(self)](Status status) { | ||
|
@@ -1364,6 +1398,7 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple | |
{ | ||
REALM_ASSERT(upload_completion || download_completion); | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
util::bind_ptr<SessionWrapper> self{this}; | ||
m_client.post([self = std::move(self), handler = std::move(handler), upload_completion, | ||
|
@@ -1406,6 +1441,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
std::int_fast64_t target_mark; | ||
{ | ||
|
@@ -1449,6 +1485,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
std::int_fast64_t target_mark; | ||
{ | ||
|
@@ -1492,6 +1529,7 @@ void SessionWrapper::refresh(std::string signed_access_token) | |
{ | ||
// Thread safety required | ||
REALM_ASSERT(m_initiated); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) { | ||
if (status == ErrorCodes::OperationAborted) | ||
|
@@ -1515,7 +1553,9 @@ void SessionWrapper::refresh(std::string signed_access_token) | |
|
||
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept | ||
{ | ||
if (wrapper->m_initiated) { | ||
// If the wrapper was already finalized by finalize_before_actualization() then | ||
// nothing to do. | ||
if (wrapper->m_initiated && !wrapper->m_finalized) { | ||
ClientImpl& client = wrapper->m_client; | ||
client.register_abandoned_session_wrapper(std::move(wrapper)); | ||
} | ||
|
@@ -1526,6 +1566,7 @@ inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noex | |
void SessionWrapper::actualize(ServerEndpoint endpoint) | ||
{ | ||
REALM_ASSERT(!m_actualized); | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(!m_sess); | ||
m_db->claim_sync_agent(); | ||
auto sync_mode = endpoint.server_mode; | ||
|
@@ -1575,6 +1616,7 @@ void SessionWrapper::force_close() | |
{ | ||
REALM_ASSERT(m_actualized); | ||
REALM_ASSERT(m_sess); | ||
REALM_ASSERT(!m_finalized); | ||
m_force_closed = true; | ||
|
||
ClientImpl::Connection& conn = m_sess->get_connection(); | ||
|
@@ -1590,6 +1632,9 @@ void SessionWrapper::force_close() | |
void SessionWrapper::finalize() | ||
{ | ||
REALM_ASSERT(m_actualized); | ||
REALM_ASSERT(!m_finalized); | ||
|
||
m_finalized = true; | ||
|
||
if (!m_force_closed) { | ||
REALM_ASSERT(m_sess); | ||
|
@@ -1636,25 +1681,29 @@ inline void SessionWrapper::finalize_before_actualization() noexcept | |
{ | ||
m_actualized = true; | ||
m_force_closed = true; | ||
m_finalized = true; | ||
} | ||
|
||
|
||
inline void SessionWrapper::report_sync_transact(VersionID old_version, VersionID new_version) | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
if (m_sync_transact_handler) | ||
m_sync_transact_handler(old_version, new_version); // Throws | ||
} | ||
|
||
|
||
inline void SessionWrapper::on_sync_progress() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
m_reliable_download_progress = true; | ||
report_progress(); // Throws | ||
} | ||
|
||
|
||
void SessionWrapper::on_upload_completion() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
while (!m_upload_completion_handlers.empty()) { | ||
auto handler = std::move(m_upload_completion_handlers.back()); | ||
m_upload_completion_handlers.pop_back(); | ||
|
@@ -1707,6 +1756,7 @@ void SessionWrapper::on_download_completion() | |
|
||
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
m_suspended = true; | ||
if (m_connection_state_change_listener) { | ||
m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws | ||
|
@@ -1716,6 +1766,7 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) | |
|
||
void SessionWrapper::on_resumed() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
m_suspended = false; | ||
if (m_connection_state_change_listener) { | ||
ClientImpl::Connection& conn = m_sess->get_connection(); | ||
|
@@ -1740,6 +1791,7 @@ void SessionWrapper::on_connection_state_changed(ConnectionState state, | |
|
||
void SessionWrapper::report_progress() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
REALM_ASSERT(m_sess); | ||
|
||
if (!m_progress_handler) | ||
|
@@ -1775,15 +1827,16 @@ void SessionWrapper::report_progress() | |
util::Future<std::string> SessionWrapper::send_test_command(std::string body) | ||
{ | ||
if (!m_sess) { | ||
return util::Future<std::string>::make_ready( | ||
Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}); | ||
return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need a Future here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A future will be "copy constructed" in the return using the Status value. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it. just noticed the implicit constructor |
||
} | ||
|
||
return m_sess->send_test_command(std::move(body)); | ||
} | ||
|
||
void SessionWrapper::handle_pending_client_reset_acknowledgement() | ||
{ | ||
REALM_ASSERT(!m_finalized); | ||
|
||
auto pending_reset = [&] { | ||
auto ft = m_db->start_frozen(); | ||
return _impl::client_reset::has_pending_reset(ft); | ||
|
@@ -1865,6 +1918,7 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide | |
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED | ||
, m_proxy_config{std::move(proxy_config)} // DEPRECATED | ||
, m_reconnect_info{reconnect_info} | ||
, m_session_history{20} | ||
michael-wb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
, m_ident{ident} | ||
, m_server_endpoint{std::move(endpoint)} | ||
, m_authorization_header_name{authorization_header_name} // DEPRECATED | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!