Skip to content

Commit

Permalink
mobile: Replace std::thread with Envoy::Thread::PosixThread (#32610)
Browse files Browse the repository at this point in the history
This PR replaces the use of std::thread with Envoy::Thread::PosixThread (backed by pthread). The reason for this change is because std::thread may throw an exception and when exceptions are disabled, it will crash the program. For some certain code, such as the cert validation, it should not crash the program when a thread cannot be created, but it should return an error instead.

This PR also refactors the POSIX thread wrapper implementation and exposes the APIs via Envoy::Thread::PosixThreadFactory and Envoy::Thread::PosixThread so that they can be used directly by Envoy Mobile since Envoy Mobile only supports Android and iOS and those OSes support POSIX. The Envoy::Thread::PosixThread has additional functions not available in Envoy::Thread::Thread, such as pthreadId() and joinable() to make the migration from std::thread to Envoy::Thread::PosixThread easier. The Envoy::PosixThread::pthreadId() is especially useful for doing a comparison with Envoy::PosixThreadFactory::currentPthreadId().

Signed-off-by: Fredy Wijaya <[email protected]>
  • Loading branch information
fredyw authored Mar 6, 2024
1 parent 4fa98b2 commit 4af5231
Show file tree
Hide file tree
Showing 25 changed files with 450 additions and 145 deletions.
4 changes: 2 additions & 2 deletions mobile/library/cc/engine_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -929,14 +929,14 @@ EngineSharedPtr EngineBuilder::build() {

Engine* engine = new Engine(envoy_engine);

auto options = std::make_unique<Envoy::OptionsImplBase>();
auto options = std::make_shared<Envoy::OptionsImplBase>();
std::unique_ptr<envoy::config::bootstrap::v3::Bootstrap> bootstrap = generateBootstrap();
if (bootstrap) {
options->setConfigProto(std::move(bootstrap));
}
ENVOY_BUG(options->setLogLevel(logLevelToString(log_level_)).ok(), "invalid log level");
options->setConcurrency(1);
envoy_engine->run(std::move(options));
envoy_engine->run(options);

// we can't construct via std::make_shared
// because Engine is only constructible as a friend
Expand Down
1 change: 1 addition & 0 deletions mobile/library/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ envoy_cc_library(
"//library/common/types:c_types_lib",
"@envoy//envoy/server:lifecycle_notifier_interface",
"@envoy//envoy/stats:stats_interface",
"@envoy//source/common/common:thread_impl_lib_posix",
"@envoy//source/common/runtime:runtime_lib",
"@envoy_build_config//:extension_registry",
],
Expand Down
3 changes: 1 addition & 2 deletions mobile/library/common/engine_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class ServerLite : public Server::InstanceBase {
}
};

EngineCommon::EngineCommon(std::unique_ptr<Envoy::OptionsImplBase>&& options)
: options_(std::move(options)) {
EngineCommon::EngineCommon(std::shared_ptr<Envoy::OptionsImplBase> options) : options_(options) {

#if !defined(ENVOY_ENABLE_FULL_PROTOS)
registerMobileProtoDescriptors();
Expand Down
6 changes: 3 additions & 3 deletions mobile/library/common/engine_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Envoy {
*/
class EngineCommon {
public:
EngineCommon(std::unique_ptr<Envoy::OptionsImplBase>&& options);
EngineCommon(std::shared_ptr<Envoy::OptionsImplBase> options);
bool run() {
base_->runServer();
return true;
Expand All @@ -41,11 +41,11 @@ class EngineCommon {
Envoy::SignalAction handle_sigs_;
Envoy::TerminateHandler log_on_terminate_;
#endif
std::unique_ptr<Envoy::OptionsImplBase> options_;
std::shared_ptr<Envoy::OptionsImplBase> options_;
Event::RealTimeSystem real_time_system_; // NO_CHECK_FORMAT(real_time)
DefaultListenerHooks default_listener_hooks_;
ProdComponentFactory prod_component_factory_;
std::unique_ptr<StrippedMainBase> base_;
std::shared_ptr<StrippedMainBase> base_;
};

} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ envoy_cc_library(
":c_types_lib",
":platform_bridge_cc_proto",
"//library/common/system:system_helper_lib",
"@envoy//envoy/thread:thread_interface",
"@envoy//source/common/common:macros",
"@envoy//source/common/common:thread_impl_lib_posix",
"@envoy//source/common/tls/cert_validator:cert_validator_lib",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <list>
#include <memory>
#include <type_traits>

#include "library/common/data/utility.h"
#include "library/common/system/system_helper.h"
Expand All @@ -13,22 +12,27 @@ namespace TransportSockets {
namespace Tls {

PlatformBridgeCertValidator::PlatformBridgeCertValidator(
const Envoy::Ssl::CertificateValidationContextConfig* config, SslStats& stats)
const Envoy::Ssl::CertificateValidationContextConfig* config, SslStats& stats,
Thread::PosixThreadFactoryPtr thread_factory)
: allow_untrusted_certificate_(config != nullptr &&
config->trustChainVerification() ==
envoy::extensions::transport_sockets::tls::v3::
CertificateValidationContext::ACCEPT_UNTRUSTED),
stats_(stats) {
stats_(stats), thread_factory_(std::move(thread_factory)) {
ENVOY_BUG(config != nullptr && config->caCert().empty() &&
config->certificateRevocationList().empty(),
"Invalid certificate validation context config.");
}

PlatformBridgeCertValidator::PlatformBridgeCertValidator(
const Envoy::Ssl::CertificateValidationContextConfig* config, SslStats& stats)
: PlatformBridgeCertValidator(config, stats, Thread::PosixThreadFactory::create()) {}

PlatformBridgeCertValidator::~PlatformBridgeCertValidator() {
// Wait for validation threads to finish.
for (auto& [id, job] : validation_jobs_) {
if (job.validation_thread_.joinable()) {
job.validation_thread_.join();
if (job.validation_thread_->joinable()) {
job.validation_thread_->join();
}
}
}
Expand Down Expand Up @@ -84,10 +88,19 @@ ValidationResults PlatformBridgeCertValidator::doVerifyCertChain(

ValidationJob job;
job.result_callback_ = std::move(callback);
job.validation_thread_ =
std::thread(&verifyCertChainByPlatform, &(job.result_callback_->dispatcher()),
std::move(certs), std::string(host), std::move(subject_alt_names), this);
std::thread::id thread_id = job.validation_thread_.get_id();
Event::Dispatcher& dispatcher = job.result_callback_->dispatcher();
job.validation_thread_ = thread_factory_->createThread(
[this, &dispatcher, certs = std::move(certs), host = std::string(host),
subject_alt_names = std::move(subject_alt_names)]() -> void {
verifyCertChainByPlatform(&dispatcher, certs, host, subject_alt_names, this);
},
/* options= */ absl::nullopt, /* crash_on_failure=*/false);
if (job.validation_thread_ == nullptr) {
return {ValidationResults::ValidationStatus::Failed,
Envoy::Ssl::ClientValidationStatus::NotValidated, absl::nullopt,
"Failed creating a thread for cert chain validation."};
}
Thread::ThreadId thread_id = job.validation_thread_->pthreadId();
validation_jobs_[thread_id] = std::move(job);
return {ValidationResults::ValidationStatus::Pending,
Envoy::Ssl::ClientValidationStatus::NotValidated, absl::nullopt, absl::nullopt};
Expand Down Expand Up @@ -146,17 +159,18 @@ void PlatformBridgeCertValidator::postVerifyResultAndCleanUp(bool success, std::

dispatcher->post([weak_alive_indicator, success, hostname = std::move(hostname),
error = std::string(error_details), tls_alert, failure_type,
thread_id = std::this_thread::get_id(), parent]() {
thread_id = parent->thread_factory_->currentPthreadId(), parent]() {
if (weak_alive_indicator.expired()) {
return;
}
parent->onVerificationComplete(thread_id, hostname, success, error, tls_alert, failure_type);
});
}

void PlatformBridgeCertValidator::onVerificationComplete(std::thread::id thread_id,
std::string hostname, bool success,
std::string error, uint8_t tls_alert,
void PlatformBridgeCertValidator::onVerificationComplete(const Thread::ThreadId& thread_id,
const std::string& hostname, bool success,
const std::string& error,
uint8_t tls_alert,
ValidationFailureType failure_type) {
ENVOY_LOG(trace, "Got validation result for {} from platform", hostname);

Expand All @@ -166,7 +180,7 @@ void PlatformBridgeCertValidator::onVerificationComplete(std::thread::id thread_
return;
}
ValidationJob& job = job_handle.mapped();
job.validation_thread_.join();
job.validation_thread_->join();

Ssl::ClientValidationStatus detailed_status = Envoy::Ssl::ClientValidationStatus::NotValidated;
switch (failure_type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <thread>

#include "source/common/common/macros.h"
#include "source/common/common/posix/thread_impl.h"
#include "source/common/common/thread.h"
#include "source/common/tls/cert_validator/default_validator.h"

#include "absl/container/flat_hash_map.h"
Expand Down Expand Up @@ -55,6 +56,11 @@ class PlatformBridgeCertValidator : public CertValidator, Logger::Loggable<Logge
}

private:
GTEST_FRIEND_CLASS(PlatformBridgeCertValidatorTest, ThreadCreationFailed);

PlatformBridgeCertValidator(const Envoy::Ssl::CertificateValidationContextConfig* config,
SslStats& stats, Thread::PosixThreadFactoryPtr thread_factory);

enum class ValidationFailureType {
SUCCESS,
FAIL_VERIFY_ERROR,
Expand All @@ -78,19 +84,20 @@ class PlatformBridgeCertValidator : public CertValidator, Logger::Loggable<Logge
PlatformBridgeCertValidator* parent);

// Called when a pending verification completes. Must be invoked on the main thread.
void onVerificationComplete(std::thread::id thread_id, std::string hostname, bool success,
std::string error_details, uint8_t tls_alert,
void onVerificationComplete(const Thread::ThreadId& thread_id, const std::string& hostname,
bool success, const std::string& error_details, uint8_t tls_alert,
ValidationFailureType failure_type);

struct ValidationJob {
Ssl::ValidateResultCallbackPtr result_callback_;
std::thread validation_thread_;
Thread::PosixThreadPtr validation_thread_;
};

const bool allow_untrusted_certificate_;
SslStats& stats_;
absl::flat_hash_map<std::thread::id, ValidationJob> validation_jobs_;
absl::flat_hash_map<Thread::ThreadId, ValidationJob> validation_jobs_;
std::shared_ptr<size_t> alive_indicator_{new size_t(1)};
Thread::PosixThreadFactoryPtr thread_factory_;
};

} // namespace Tls
Expand Down
47 changes: 29 additions & 18 deletions mobile/library/common/internal_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ static std::atomic<envoy_stream_t> current_stream_handle_{0};
envoy_stream_t InternalEngine::initStream() { return current_stream_handle_++; }

InternalEngine::InternalEngine(envoy_engine_callbacks callbacks, envoy_logger logger,
envoy_event_tracker event_tracker)
: callbacks_(callbacks), logger_(logger), event_tracker_(event_tracker),
dispatcher_(std::make_unique<Event::ProvisionalDispatcher>()) {
envoy_event_tracker event_tracker,
Thread::PosixThreadFactoryPtr thread_factory)
: thread_factory_(std::move(thread_factory)), callbacks_(callbacks), logger_(logger),
event_tracker_(event_tracker), dispatcher_(std::make_unique<Event::ProvisionalDispatcher>()) {
ExtensionRegistry::registerFactories();

// TODO(Augustyniak): Capturing an address of event_tracker_ and registering it in the API
Expand All @@ -32,12 +33,13 @@ InternalEngine::InternalEngine(envoy_engine_callbacks callbacks, envoy_logger lo
Runtime::maybeSetRuntimeGuard("envoy.reloadable_features.dfp_mixed_scheme", true);
}

InternalEngine::InternalEngine(envoy_engine_callbacks callbacks, envoy_logger logger,
envoy_event_tracker event_tracker)
: InternalEngine(callbacks, logger, event_tracker, Thread::PosixThreadFactory::create()) {}

envoy_status_t InternalEngine::run(const std::string& config, const std::string& log_level) {
// Start the Envoy on the dedicated thread. Note: due to how the assignment operator works with
// std::thread, main_thread_ is the same object after this call, but its state is replaced with
// that of the temporary. The temporary object's state becomes the default state, which does
// nothing.
auto options = std::make_unique<Envoy::OptionsImplBase>();
// Start the Envoy on the dedicated thread.
auto options = std::make_shared<Envoy::OptionsImplBase>();
options->setConfigYaml(config);
if (!log_level.empty()) {
ENVOY_BUG(options->setLogLevel(log_level).ok(), "invalid log level");
Expand All @@ -46,12 +48,17 @@ envoy_status_t InternalEngine::run(const std::string& config, const std::string&
return run(std::move(options));
}

envoy_status_t InternalEngine::run(std::unique_ptr<Envoy::OptionsImplBase>&& options) {
main_thread_ = std::thread(&InternalEngine::main, this, std::move(options));
return ENVOY_SUCCESS;
// This function takes a `std::shared_ptr` instead of `std::unique_ptr` because `std::function` is a
// copy-constructible type, so it's not possible to move capture `std::unique_ptr` with
// `std::function`.
envoy_status_t InternalEngine::run(std::shared_ptr<Envoy::OptionsImplBase> options) {
main_thread_ =
thread_factory_->createThread([this, options]() mutable -> void { main(options); },
/* options= */ absl::nullopt, /* crash_on_failure= */ false);
return (main_thread_ != nullptr) ? ENVOY_SUCCESS : ENVOY_FAILURE;
}

envoy_status_t InternalEngine::main(std::unique_ptr<Envoy::OptionsImplBase>&& options) {
envoy_status_t InternalEngine::main(std::shared_ptr<Envoy::OptionsImplBase> options) {
// Using unique_ptr ensures main_common's lifespan is strictly scoped to this function.
std::unique_ptr<EngineCommon> main_common;
{
Expand Down Expand Up @@ -81,7 +88,7 @@ envoy_status_t InternalEngine::main(std::unique_ptr<Envoy::OptionsImplBase>&& op
std::make_unique<Logger::DefaultDelegate>(log_mutex_, Logger::Registry::getSink());
}

main_common = std::make_unique<EngineCommon>(std::move(options));
main_common = std::make_unique<EngineCommon>(options);
server_ = main_common->server();
event_dispatcher_ = &server_->dispatcher();

Expand Down Expand Up @@ -150,8 +157,12 @@ envoy_status_t InternalEngine::terminate() {
IS_ENVOY_BUG("attempted to double terminate engine");
return ENVOY_FAILURE;
}
// The Engine could not be created.
if (main_thread_ == nullptr) {
return ENVOY_FAILURE;
}
// If main_thread_ has finished (or hasn't started), there's nothing more to do.
if (!main_thread_.joinable()) {
if (!main_thread_->joinable()) {
return ENVOY_FAILURE;
}

Expand All @@ -170,16 +181,16 @@ envoy_status_t InternalEngine::terminate() {
dispatcher_->post([this]() { http_client_->shutdownApiListener(); });

// Exit the event loop and finish up in Engine::run(...)
if (std::this_thread::get_id() == main_thread_.get_id()) {
if (thread_factory_->currentPthreadId() == main_thread_->pthreadId()) {
// TODO(goaway): figure out some way to support this.
PANIC("Terminating the engine from its own main thread is currently unsupported.");
} else {
dispatcher_->terminate();
}
} // lock(_mutex)

if (std::this_thread::get_id() != main_thread_.get_id()) {
main_thread_.join();
if (thread_factory_->currentPthreadId() != main_thread_->pthreadId()) {
main_thread_->join();
}
terminated_ = true;
return ENVOY_SUCCESS;
Expand Down Expand Up @@ -265,7 +276,7 @@ void handlerStats(Stats::Store& stats, Buffer::Instance& response) {
}

std::string InternalEngine::dumpStats() {
if (!main_thread_.joinable()) {
if (!main_thread_->joinable()) {
return "";
}

Expand Down
15 changes: 12 additions & 3 deletions mobile/library/common/internal_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "envoy/stats/store.h"

#include "source/common/common/logger.h"
#include "source/common/common/macros.h"
#include "source/common/common/posix/thread_impl.h"
#include "source/common/common/thread.h"

#include "absl/base/call_once.h"
#include "extension_registry.h"
Expand Down Expand Up @@ -37,7 +40,7 @@ class InternalEngine : public Logger::Loggable<Logger::Id::main> {
* @param log_level, the log level.
*/
envoy_status_t run(const std::string& config, const std::string& log_level);
envoy_status_t run(std::unique_ptr<Envoy::OptionsImplBase>&& options);
envoy_status_t run(std::shared_ptr<Envoy::OptionsImplBase> options);

/**
* Immediately terminate the engine, if running. Calling this function when
Expand Down Expand Up @@ -118,10 +121,16 @@ class InternalEngine : public Logger::Loggable<Logger::Id::main> {
Stats::Store& getStatsStore();

private:
envoy_status_t main(std::unique_ptr<Envoy::OptionsImplBase>&& options);
GTEST_FRIEND_CLASS(InternalEngineTest, ThreadCreationFailed);

InternalEngine(envoy_engine_callbacks callbacks, envoy_logger logger,
envoy_event_tracker event_tracker, Thread::PosixThreadFactoryPtr thread_factory);

envoy_status_t main(std::shared_ptr<Envoy::OptionsImplBase> options);
static void logInterfaces(absl::string_view event,
std::vector<Network::InterfacePair>& interfaces);

Thread::PosixThreadFactoryPtr thread_factory_;
Event::Dispatcher* event_dispatcher_{};
Stats::ScopeSharedPtr client_scope_;
Stats::StatNameSetPtr stat_name_set_;
Expand All @@ -142,7 +151,7 @@ class InternalEngine : public Logger::Loggable<Logger::Id::main> {
Server::ServerLifecycleNotifier::HandlePtr postinit_callback_handler_;
// main_thread_ should be destroyed first, hence it is the last member variable. Objects with
// instructions scheduled on the main_thread_ need to have a longer lifetime.
std::thread main_thread_{}; // Empty placeholder to be populated later.
Thread::PosixThreadPtr main_thread_{nullptr}; // Empty placeholder to be populated later.
bool terminated_{false};
};

Expand Down
1 change: 1 addition & 0 deletions mobile/test/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ envoy_cc_test(
"//test/common/mocks/common:common_mocks",
"//test/common/mocks/event:event_mocks",
"@envoy//test/common/http:common_lib",
"@envoy//test/mocks/thread:thread_mocks",
"@envoy_build_config//:test_extensions",
],
)
2 changes: 1 addition & 1 deletion mobile/test/common/engine_common_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Envoy {

TEST(EngineCommonTest, SignalHandlingFalse) {
ExtensionRegistry::registerFactories();
auto options = std::make_unique<Envoy::OptionsImplBase>();
auto options = std::make_shared<Envoy::OptionsImplBase>();

Platform::EngineBuilder builder;
options->setConfigProto(builder.generateBootstrap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ envoy_extension_cc_test(
"@envoy//test/common/tls/test_data:cert_infos",
"@envoy//test/mocks/event:event_mocks",
"@envoy//test/mocks/ssl:ssl_mocks",
"@envoy//test/mocks/thread:thread_mocks",
"@envoy//test/test_common:environment_lib",
"@envoy//test/test_common:file_system_for_test_lib",
"@envoy//test/test_common:test_runtime_lib",
Expand Down
Loading

0 comments on commit 4af5231

Please sign in to comment.