Skip to content

Commit

Permalink
Update Envoy to 95038fe (Aug 12th 2021). (#734)
Browse files Browse the repository at this point in the history
- refactoring `ProcessImpl`, creating a named constructor that allows us to
  create bootstrap initially and pass it to `Envoy::Api::Impl()` as required
  since envoyproxy/envoy#17562. Specifically:
  - Moving method `ProcessImpl::determineConcurrency()` out of the ProcessImpl class so that it can be used during its construction.
  - Moving code that extracts URIs from `process_impl.cc` into `process_bootstrap.cc`.
  - Adding a previously missing test case `CreatesBootstrapForH1RespectingPortInUri` into `process_bootstrap_test.cc`.
  - Removing a TODO that incorrectly indicated URI DNS resolution is optional. Envoy [requires](https://github.com/envoyproxy/envoy/blob/716ee8abc526d51f07ed6d3c2a5aa8a3b2481d9d/api/envoy/config/core/v3/address.proto#L64-L67) resolved IPs in the Bootstrap for cluster of type STATIC.
  - Creating a named constructor for `ProcessImpl` that creates the `Envoy::Api::Api` with an empty Bootstrap that is then replaced with the one generated. See an inline comment for explanation.
  - Moving callers onto the named constructor and making the original constructor of `ProcessImpl` private.
- no changes to `.bazelrc`, `.bazelversion`, `run_envoy_docker.sh`.

Signed-off-by: Jakub Sobon <[email protected]>
  • Loading branch information
mum4k authored Aug 18, 2021
1 parent 0c878c1 commit 9a6d718
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 204 deletions.
4 changes: 2 additions & 2 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

ENVOY_COMMIT = "e85a7f408c7baee8e1ed4af39a647c98ee5f2215" # Aug 10, 2021
ENVOY_SHA = "1745bf00a464327a0993b5229cae5e30423854a77b9b1d0c4ee96306e9aedc64"
ENVOY_COMMIT = "95038feabf260c3937465951d5da603d31ea3bd4" # Aug 12, 2021
ENVOY_SHA = "4a584b02c24ac24362eff2550977616f86a194e61106e15f723e1cf961ca145d"

HDR_HISTOGRAM_C_VERSION = "0.11.2" # October 12th, 2020
HDR_HISTOGRAM_C_SHA = "637f28b5f64de2e268131e4e34e6eef0b91cf5ff99167db447d9b2825eae6bad"
Expand Down
10 changes: 8 additions & 2 deletions source/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ bool Main::run() {
stub = std::make_unique<nighthawk::client::NighthawkService::Stub>(channel);
process = std::make_unique<RemoteProcessImpl>(*options_, *stub);
} else {
process = std::make_unique<ProcessImpl>(*options_, time_system);
absl::StatusOr<ProcessPtr> process_or_status =
ProcessImpl::CreateProcessImpl(*options_, time_system);
if (!process_or_status.ok()) {
ENVOY_LOG(error, "Unable to create ProcessImpl: {}", process_or_status.status().ToString());
return false;
}
process = std::move(*process_or_status);
}
OutputFormatterFactoryImpl output_formatter_factory;
OutputCollectorImpl output_collector(time_system, *options_);
Expand Down Expand Up @@ -100,4 +106,4 @@ bool Main::run() {
}

} // namespace Client
} // namespace Nighthawk
} // namespace Nighthawk
51 changes: 45 additions & 6 deletions source/client/process_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "external/envoy_api/envoy/extensions/upstreams/http/v3/http_protocol_options.pb.h"

#include "source/client/sni_utility.h"
#include "source/common/uri_impl.h"
#include "source/common/utility.h"

namespace Nighthawk {
namespace {
Expand Down Expand Up @@ -175,15 +177,52 @@ Cluster createNighthawkClusterForWorker(const Client::Options& options,
return cluster;
}

// Extracts URIs of the targets and the request source (if specified) from the
// Nighthawk options.
// Resolves all the extracted URIs.
absl::Status extractAndResolveUrisFromOptions(Envoy::Event::Dispatcher& dispatcher,
const Client::Options& options,
std::vector<UriPtr>* uris,
UriPtr* request_source_uri) {
try {
if (options.uri().has_value()) {
uris->push_back(std::make_unique<UriImpl>(options.uri().value()));
} else {
for (const nighthawk::client::MultiTarget::Endpoint& endpoint :
options.multiTargetEndpoints()) {
uris->push_back(std::make_unique<UriImpl>(fmt::format(
"{}://{}:{}{}", options.multiTargetUseHttps() ? "https" : "http",
endpoint.address().value(), endpoint.port().value(), options.multiTargetPath())));
}
}
for (const UriPtr& uri : *uris) {
uri->resolve(dispatcher, Utility::translateFamilyOptionString(options.addressFamily()));
}
if (options.requestSource() != "") {
*request_source_uri = std::make_unique<UriImpl>(options.requestSource());
(*request_source_uri)
->resolve(dispatcher, Utility::translateFamilyOptionString(options.addressFamily()));
}
} catch (const UriException& ex) {
return absl::InvalidArgumentError(
fmt::format("URI exception (for example, malformed URI syntax, bad MultiTarget path, "
"unresolvable host DNS): {}",
ex.what()));
}
return absl::OkStatus();
}

} // namespace

absl::StatusOr<Bootstrap> createBootstrapConfiguration(const Client::Options& options,
const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri,
absl::StatusOr<Bootstrap> createBootstrapConfiguration(Envoy::Event::Dispatcher& dispatcher,
const Client::Options& options,
int number_of_workers) {
if (uris.empty()) {
return absl::InvalidArgumentError(
"illegal configuration with zero endpoints, at least one uri must be specified");
std::vector<UriPtr> uris;
UriPtr request_source_uri;
absl::Status uri_status =
extractAndResolveUrisFromOptions(dispatcher, options, &uris, &request_source_uri);
if (!uri_status.ok()) {
return uri_status;
}

Bootstrap bootstrap;
Expand Down
14 changes: 5 additions & 9 deletions source/client/process_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "nighthawk/common/uri.h"

#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/event/dispatcher_impl.h"
#include "external/envoy_api/envoy/config/bootstrap/v3/bootstrap.pb.h"

namespace Nighthawk {
Expand All @@ -14,21 +15,16 @@ namespace Nighthawk {
* The created bootstrap configuration can be used to upstream requests to the
* specified uris.
*
* @param dispatcher is used when resolving hostnames to IP addresses in the
bootstrap.
* @param options are the options this Nighthawk execution was triggered with.
* @param uris are the endpoints to which the requests will be upstreamed. At
* least one uri must be specified. It is assumed that all the uris have
* the same scheme (e.g. https). All the uri objects must already be
* resolved.
* @param request_source_uri is the address of the request source service to
* use, can be NULL if request source isn't used. If not NULL, the uri
* object must already be resolved.
* @param number_of_workers indicates how many Nighthawk workers will be
* upstreaming requests. A separate cluster is generated for each worker.
*
* @return the created bootstrap configuration.
*/
absl::StatusOr<envoy::config::bootstrap::v3::Bootstrap>
createBootstrapConfiguration(const Client::Options& options, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, int number_of_workers);
createBootstrapConfiguration(Envoy::Event::Dispatcher& dispatcher, const Client::Options& options,
int number_of_workers);

} // namespace Nighthawk
158 changes: 83 additions & 75 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,44 @@ using namespace std::chrono_literals;

namespace Nighthawk {
namespace Client {
namespace {

using ::envoy::config::bootstrap::v3::Bootstrap;

// Helps in generating a bootstrap for the process.
// This is a class only to allow the use of the ENVOY_LOG macros.
class BootstrapFactory : public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
// Determines the concurrency Nighthawk should use based on configuration
// (options) and the available machine resources.
static uint32_t determineConcurrency(const Options& options) {
uint32_t cpu_cores_with_affinity = Envoy::OptionsImplPlatform::getCpuCount();
bool autoscale = options.concurrency() == "auto";
// TODO(oschaaf): Maybe, in the case where the concurrency flag is left out, but
// affinity is set / we don't have affinity with all cores, we should default to autoscale.
// (e.g. we are called via taskset).
uint32_t concurrency = autoscale ? cpu_cores_with_affinity : std::stoi(options.concurrency());

if (autoscale) {
ENVOY_LOG(info, "Detected {} (v)CPUs with affinity..", cpu_cores_with_affinity);
}
std::string duration_as_string =
options.noDuration() ? "No time limit"
: fmt::format("Time limit: {} seconds", options.duration().count());
ENVOY_LOG(info, "Starting {} threads / event loops. {}.", concurrency, duration_as_string);
ENVOY_LOG(info, "Global targets: {} connections and {} calls per second.",
options.connections() * concurrency, options.requestsPerSecond() * concurrency);

if (concurrency > 1) {
ENVOY_LOG(info, " (Per-worker targets: {} connections and {} calls per second)",
options.connections(), options.requestsPerSecond());
}

return concurrency;
}
};

} // namespace

// We customize ProdClusterManagerFactory for the sole purpose of returning our specialized
// http1 pool to the benchmark client, which allows us to offer connection prefetching.
Expand Down Expand Up @@ -123,17 +161,17 @@ class ClusterManagerFactory : public Envoy::Upstream::ProdClusterManagerFactory

ProcessImpl::ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system,
const std::shared_ptr<Envoy::ProcessWide>& process_wide)
: process_wide_(process_wide == nullptr ? std::make_shared<Envoy::ProcessWide>()
: options_(options), number_of_workers_(BootstrapFactory::determineConcurrency(options_)),
process_wide_(process_wide == nullptr ? std::make_shared<Envoy::ProcessWide>()
: process_wide),
time_system_(time_system), stats_allocator_(symbol_table_), store_root_(stats_allocator_),
quic_stat_names_(store_root_.symbolTable()),
api_(std::make_unique<Envoy::Api::Impl>(platform_impl_.threadFactory(), store_root_,
time_system_, platform_impl_.fileSystem(),
generator_)),
time_system_, platform_impl_.fileSystem(), generator_,
bootstrap_)),
dispatcher_(api_->allocateDispatcher("main_thread")), benchmark_client_factory_(options),
termination_predicate_factory_(options), sequencer_factory_(options),
request_generator_factory_(options, *api_), options_(options),
init_manager_("nh_init_manager"),
request_generator_factory_(options, *api_), init_manager_("nh_init_manager"),
local_info_(new Envoy::LocalInfo::LocalInfoImpl(
store_root_.symbolTable(), node_, node_context_params_,
Envoy::Network::Utility::getLocalAddress(Envoy::Network::Address::IpVersion::v4),
Expand All @@ -153,6 +191,35 @@ ProcessImpl::ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_
configureComponentLogLevels(spdlog::level::from_str(lower));
}

absl::StatusOr<ProcessPtr>
ProcessImpl::CreateProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system,
const std::shared_ptr<Envoy::ProcessWide>& process_wide) {
std::unique_ptr<ProcessImpl> process(new ProcessImpl(options, time_system, process_wide));

absl::StatusOr<Bootstrap> bootstrap = createBootstrapConfiguration(
*process->dispatcher_, process->options_, process->number_of_workers_);
if (!bootstrap.ok()) {
ENVOY_LOG(error, "Failed to create bootstrap configuration: {}", bootstrap.status().message());
process->shutdown();
return bootstrap.status();
}

// Ideally we would create the bootstrap first and then pass it to the
// constructor of Envoy::Api::Api. That cannot be done because of a circular
// dependency:
// 1) The constructor of Envoy::Api::Api requires an instance of Bootstrap.
// 2) The bootstrap generator requires an Envoy::Event::Dispatcher to resolve
// URIs to IPs required in the Bootstrap.
// 3) The constructor of Envoy::Event::Dispatcher requires Envoy::Api::Api.
//
// Replacing the bootstrap_ after the Envoy::Api::Api has been created is
// assumed to be safe, because we still do it while constructing the
// ProcessImpl, i.e. before we start running the process.
process->bootstrap_ = *bootstrap;

return process;
}

ProcessImpl::~ProcessImpl() {
RELEASE_ASSERT(shutdown_, "shutdown not called before destruction.");
}
Expand Down Expand Up @@ -241,32 +308,6 @@ void ProcessImpl::configureComponentLogLevels(spdlog::level::level_enum level) {
logger_to_change->setLevel(level);
}

uint32_t ProcessImpl::determineConcurrency() const {
uint32_t cpu_cores_with_affinity = Envoy::OptionsImplPlatform::getCpuCount();
bool autoscale = options_.concurrency() == "auto";
// TODO(oschaaf): Maybe, in the case where the concurrency flag is left out, but
// affinity is set / we don't have affinity with all cores, we should default to autoscale.
// (e.g. we are called via taskset).
uint32_t concurrency = autoscale ? cpu_cores_with_affinity : std::stoi(options_.concurrency());

if (autoscale) {
ENVOY_LOG(info, "Detected {} (v)CPUs with affinity..", cpu_cores_with_affinity);
}
std::string duration_as_string =
options_.noDuration() ? "No time limit"
: fmt::format("Time limit: {} seconds", options_.duration().count());
ENVOY_LOG(info, "Starting {} threads / event loops. {}.", concurrency, duration_as_string);
ENVOY_LOG(info, "Global targets: {} connections and {} calls per second.",
options_.connections() * concurrency, options_.requestsPerSecond() * concurrency);

if (concurrency > 1) {
ENVOY_LOG(info, " (Per-worker targets: {} connections and {} calls per second)",
options_.connections(), options_.requestsPerSecond());
}

return concurrency;
}

std::vector<StatisticPtr>
ProcessImpl::vectorizeStatisticPtrMap(const StatisticPtrMap& statistics) const {
std::vector<StatisticPtr> v;
Expand Down Expand Up @@ -386,8 +427,7 @@ void ProcessImpl::setupStatsSinks(const envoy::config::bootstrap::v3::Bootstrap&
}
}

bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri,
bool ProcessImpl::runInternal(OutputCollector& collector, const UriPtr& tracing_uri,
const absl::optional<Envoy::SystemTime>& scheduled_start) {
const Envoy::SystemTime now = time_system_.systemTime();
if (scheduled_start.value_or(now) < now) {
Expand All @@ -399,24 +439,15 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
if (cancelled_) {
return true;
}
int number_of_workers = determineConcurrency();
shutdown_ = false;

absl::StatusOr<envoy::config::bootstrap::v3::Bootstrap> bootstrap =
createBootstrapConfiguration(options_, uris, request_source_uri, number_of_workers);
if (!bootstrap.ok()) {
ENVOY_LOG(error, "Failed to create bootstrap configuration: {}",
bootstrap.status().message());
return false;
}

// Needs to happen as early as possible (before createWorkers()) in the instantiation to preempt
// the objects that require stats.
if (!options_.statsSinks().empty()) {
store_root_.setTagProducer(Envoy::Config::Utility::createTagProducer(*bootstrap));
store_root_.setTagProducer(Envoy::Config::Utility::createTagProducer(bootstrap_));
}

createWorkers(number_of_workers, scheduled_start);
createWorkers(number_of_workers_, scheduled_start);
tls_.registerThread(*dispatcher_, true);
store_root_.initializeThreading(*dispatcher_, tls_);
runtime_singleton_ = std::make_unique<Envoy::Runtime::ScopedLoaderSingleton>(
Expand Down Expand Up @@ -446,21 +477,21 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
: Http1PoolImpl::ConnectionReuseStrategy::MRU);
cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections());
if (tracing_uri != nullptr) {
setupTracingImplementation(*bootstrap, *tracing_uri);
addTracingCluster(*bootstrap, *tracing_uri);
setupTracingImplementation(bootstrap_, *tracing_uri);
addTracingCluster(bootstrap_, *tracing_uri);
}
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap->DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(*bootstrap);
maybeCreateTracingDriver(bootstrap->tracing());
ENVOY_LOG(debug, "Computed configuration: {}", bootstrap_.DebugString());
cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap_);
maybeCreateTracingDriver(bootstrap_.tracing());
cluster_manager_->setInitializedCb(
[this]() -> void { init_manager_.initialize(init_watcher_); });

Envoy::Runtime::LoaderSingleton::get().initialize(*cluster_manager_);

std::list<std::unique_ptr<Envoy::Stats::Sink>> stats_sinks;
setupStatsSinks(*bootstrap, stats_sinks);
setupStatsSinks(bootstrap_, stats_sinks);
std::chrono::milliseconds stats_flush_interval = std::chrono::milliseconds(
Envoy::DurationUtil::durationToMilliseconds(bootstrap->stats_flush_interval()));
Envoy::DurationUtil::durationToMilliseconds(bootstrap_.stats_flush_interval()));

if (!options_.statsSinks().empty()) {
// There should be only a single live flush worker instance at any time.
Expand Down Expand Up @@ -541,31 +572,9 @@ bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector<UriP
}

bool ProcessImpl::run(OutputCollector& collector) {
std::vector<UriPtr> uris;
UriPtr request_source_uri;
UriPtr tracing_uri;

try {
// TODO(oschaaf): See if we can rid of resolving here.
// We now only do it to validate.
if (options_.uri().has_value()) {
uris.push_back(std::make_unique<UriImpl>(options_.uri().value()));
} else {
for (const nighthawk::client::MultiTarget::Endpoint& endpoint :
options_.multiTargetEndpoints()) {
uris.push_back(std::make_unique<UriImpl>(fmt::format(
"{}://{}:{}{}", options_.multiTargetUseHttps() ? "https" : "http",
endpoint.address().value(), endpoint.port().value(), options_.multiTargetPath())));
}
}
for (const UriPtr& uri : uris) {
uri->resolve(*dispatcher_, Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.requestSource() != "") {
request_source_uri = std::make_unique<UriImpl>(options_.requestSource());
request_source_uri->resolve(*dispatcher_,
Utility::translateFamilyOptionString(options_.addressFamily()));
}
if (options_.trace() != "") {
tracing_uri = std::make_unique<UriImpl>(options_.trace());
tracing_uri->resolve(*dispatcher_,
Expand All @@ -580,8 +589,7 @@ bool ProcessImpl::run(OutputCollector& collector) {
}

try {
return runInternal(collector, uris, request_source_uri, tracing_uri,
options_.scheduled_start());
return runInternal(collector, tracing_uri, options_.scheduled_start());
} catch (Envoy::EnvoyException& ex) {
ENVOY_LOG(error, "Fatal exception: {}", ex.what());
throw;
Expand Down
Loading

0 comments on commit 9a6d718

Please sign in to comment.