Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(app,outbound)!: Decouple metrics registry from stack building #2887

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,7 @@ dependencies = [
"jemallocator",
"linkerd-app",
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-signal",
"num_cpus",
"tokio",
Expand Down
40 changes: 27 additions & 13 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,30 @@ impl fmt::Display for ControlAddr {
pub type RspBody =
linkerd_http_metrics::requests::ResponseBody<http::balance::Body<hyper::Body>, classify::Eos>;

#[derive(Clone, Debug, Default)]
pub struct Metrics {
balance: balance::Metrics,
}

const EWMA_CONFIG: http::balance::EwmaConfig = http::balance::EwmaConfig {
default_rtt: time::Duration::from_millis(30),
decay: time::Duration::from_secs(10),
};

impl Metrics {
pub fn register(registry: &mut prom::Registry) -> Self {
Metrics {
balance: balance::Metrics::register(registry.sub_registry_with_prefix("balancer")),
}
}
}

impl Config {
pub fn build(
self,
dns: dns::Resolver,
metrics: metrics::ControlHttp,
registry: &mut prom::Registry,
legacy_metrics: metrics::ControlHttp,
metrics: Metrics,
identity: identity::NewClient,
) -> svc::ArcNewService<
(),
Expand Down Expand Up @@ -133,12 +146,8 @@ impl Config {

let balance = endpoint
.lift_new()
.push(self::balance::layer(
registry.sub_registry_with_prefix("balancer"),
dns,
resolve_backoff,
))
.push(metrics.to_layer::<classify::Response, _, _>())
.push(self::balance::layer(metrics.balance, dns, resolve_backoff))
.push(legacy_metrics.to_layer::<classify::Response, _, _>())
.push(classify::NewClassify::layer_default());

balance
Expand Down Expand Up @@ -233,25 +242,30 @@ mod balance {
use super::{client::Target, ControlAddr};
use crate::{
dns,
metrics::prom::{self, encoding::EncodeLabelSet},
metrics::prom::encoding::EncodeLabelSet,
proxy::{dns_resolve::DnsResolve, http, resolve::recover},
svc, tls,
};
use linkerd_stack::ExtractParam;
use std::net::SocketAddr;

pub(super) type Metrics = http::balance::MetricFamilies<Labels>;

pub fn layer<B, R: Clone, N>(
registry: &mut prom::Registry,
metrics: Metrics,
dns: dns::Resolver,
recover: R,
) -> impl svc::Layer<
N,
Service = http::NewBalance<B, Params, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
> {
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
let metrics = Params(http::balance::MetricFamilies::register(registry));
svc::layer::mk(move |inner| {
http::NewBalance::new(NewIntoTarget { inner }, resolve.clone(), metrics.clone())
http::NewBalance::new(
NewIntoTarget { inner },
resolve.clone(),
Params(metrics.clone()),
)
})
}

Expand All @@ -270,7 +284,7 @@ mod balance {
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct Labels {
pub(super) struct Labels {
addr: String,
}

Expand Down
5 changes: 2 additions & 3 deletions linkerd/app/gateway/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Gateway;
use inbound::{GatewayAddr, GatewayDomainInvalid};
use linkerd_app_core::{
metrics::{prom, ServerLabel},
metrics::ServerLabel,
profiles,
proxy::{
api_resolve::{ConcreteAddr, Metadata},
Expand Down Expand Up @@ -47,7 +47,6 @@ impl Gateway {
/// outbound router.
pub fn http<T, R>(
&self,
registry: &mut prom::Registry,
inner: svc::ArcNewHttp<
outbound::http::concrete::Endpoint<
outbound::http::logical::Concrete<outbound::http::Http<Target>>,
Expand Down Expand Up @@ -86,7 +85,7 @@ impl Gateway {
.outbound
.clone()
.with_stack(inner)
.push_http_cached(outbound::http::HttpMetrics::register(registry), resolve)
.push_http_cached(resolve)
.into_stack()
// Discard `T` and its associated client-specific metadata.
.push_map_target(Target::discard_parent)
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/gateway/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ async fn upgraded_request_remains_relative_form() {
);
gateway
.http(
&mut Default::default(),
svc::ArcNewHttp::new(move |_: _| svc::BoxHttp::new(inner.clone())),
resolve,
)
Expand Down
13 changes: 3 additions & 10 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
#![forbid(unsafe_code)]

use linkerd_app_core::{
io,
metrics::prom,
profiles,
io, profiles,
proxy::{
api_resolve::{ConcreteAddr, Metadata},
core::Resolve,
Expand Down Expand Up @@ -50,7 +48,6 @@ impl Gateway {
/// stack.
pub fn stack<T, I, R>(
self,
registry: &mut prom::Registry,
resolve: R,
profiles: impl profiles::GetProfile<Error = Error>,
policies: impl outbound::policy::GetPolicy,
Expand All @@ -73,12 +70,8 @@ impl Gateway {
R::Resolution: Unpin,
{
let opaq = {
let registry = registry.sub_registry_with_prefix("tcp");
let resolve = resolve.clone();
let opaq = self
.outbound
.to_tcp_connect()
.push_opaq_cached(registry, resolve);
let opaq = self.outbound.to_tcp_connect().push_opaq_cached(resolve);
self.opaq(opaq.into_inner()).into_inner()
};

Expand All @@ -88,7 +81,7 @@ impl Gateway {
.to_tcp_connect()
.push_tcp_endpoint()
.push_http_tcp_client();
let http = self.http(registry, http.into_inner(), resolve);
let http = self.http(http.into_inner(), resolve);
self.inbound
.clone()
.with_stack(http.into_inner())
Expand Down
9 changes: 8 additions & 1 deletion linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,14 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
let bind_adm = listen::BindTcp::default();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
let main = config
.build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle)
.build(
bind_in,
bind_out,
bind_adm,
shutdown_tx,
trace_handle,
Default::default(),
)
.await
.expect("config");

Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/discover/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn errors_propagate() {

// Create a profile stack that uses the tracked inner stack.
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(stack)
.push_discover(discover)
.into_inner();
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn caches_profiles_until_idle() {
cfg
};
let (rt, _shutdown) = runtime();
let stack = Outbound::new(cfg, rt)
let stack = Outbound::new(cfg, rt, &mut Default::default())
.with_stack(stack)
.push_discover(discover)
.into_inner();
Expand Down
10 changes: 3 additions & 7 deletions linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ impl<T> Outbound<svc::ArcNewHttp<concrete::Endpoint<logical::Concrete<Http<T>>>>
/// Builds a stack that routes HTTP requests to endpoint stacks.
///
/// Buffered concrete services are cached in and evicted when idle.
pub fn push_http_cached<R>(
self,
metrics: HttpMetrics,
resolve: R,
) -> Outbound<svc::ArcNewCloneHttp<T>>
pub fn push_http_cached<R>(self, resolve: R) -> Outbound<svc::ArcNewCloneHttp<T>>
where
// Logical HTTP target.
T: svc::Param<http::Version>,
Expand All @@ -101,8 +97,8 @@ impl<T> Outbound<svc::ArcNewHttp<concrete::Endpoint<logical::Concrete<Http<T>>>>
R::Resolution: Unpin,
{
self.push_http_endpoint()
.push_http_concrete(metrics.balancer, resolve)
.push_http_logical(metrics.http_route, metrics.grpc_route)
.push_http_concrete(resolve)
.push_http_logical()
.map_stack(move |config, _, stk| {
stk.push_new_idle_cached(config.discovery_idle_timeout)
.push_map_target(Http)
Expand Down
13 changes: 2 additions & 11 deletions linkerd/app/outbound/src/http/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ impl<N> Outbound<N> {
/// 'failfast'. While in failfast, buffered requests are failed and the
/// service becomes unavailable so callers may choose alternate concrete
/// services.
pub fn push_http_concrete<T, NSvc, R>(
self,
balancer_metrics: balance::BalancerMetrics,
resolve: R,
) -> Outbound<svc::ArcNewCloneHttp<T>>
pub fn push_http_concrete<T, NSvc, R>(self, resolve: R) -> Outbound<svc::ArcNewCloneHttp<T>>
where
// Concrete target type.
T: svc::Param<ParentRef>,
Expand Down Expand Up @@ -105,12 +101,7 @@ impl<N> Outbound<N> {
});

inner
.push(balance::Balance::layer(
config,
rt,
balancer_metrics,
resolve,
))
.push(balance::Balance::layer(config, rt, resolve))
.check_new_clone()
.push_switch(Ok::<_, Infallible>, forward.into_inner())
.push_switch(
Expand Down
7 changes: 5 additions & 2 deletions linkerd/app/outbound/src/http/concrete/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ where
pub(super) fn layer<N, NSvc, R>(
config: &crate::Config,
rt: &crate::Runtime,
metrics: BalancerMetrics,
resolve: R,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
Expand All @@ -101,6 +100,7 @@ where
let http_queue = config.http_request_queue;
let inbound_ips = config.inbound_ips.clone();
let stack_metrics = rt.metrics.proxy.stack.clone();
let balance_metrics = rt.metrics.prom.http.balancer.clone();

let resolve = svc::stack(resolve.into_service())
.push_map_target(|t: Self| ConcreteAddr(t.addr))
Expand Down Expand Up @@ -148,7 +148,10 @@ where
.push(svc::ArcNewService::layer());

endpoint
.push(http::NewBalance::layer(resolve.clone(), metrics.clone()))
.push(http::NewBalance::layer(
resolve.clone(),
balance_metrics.clone(),
))
.push_on_service(http::BoxResponse::layer())
.push_on_service(stack_metrics.layer(stack_labels("http", "balance")))
.push(svc::NewMapErr::layer_from_target::<BalanceError, _>())
Expand Down
10 changes: 5 additions & 5 deletions linkerd/app/outbound/src/http/endpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn http11_forward() {

// Build the outbound server
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint()
Expand Down Expand Up @@ -59,7 +59,7 @@ async fn http2_forward() {

// Build the outbound server
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<http::BoxBody>()
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn orig_proto_upgrade() {

// Build the outbound server
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<http::BoxBody>()
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn orig_proto_skipped_on_http_upgrade() {
// Build the outbound server
let (rt, _shutdown) = runtime();
let drain = rt.drain.clone();
let stack = Outbound::new(default_config(), rt)
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<http::BoxBody>()
Expand Down Expand Up @@ -195,7 +195,7 @@ async fn orig_proto_http2_noop() {

// Build the outbound server
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt)
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<http::BoxBody>()
Expand Down
18 changes: 4 additions & 14 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,7 @@ impl<N> Outbound<N> {
/// support per-request routing over a set of concrete inner services.
/// Only available inner services are used for routing. When there are no
/// available backends, requests are failed with a [`svc::stack::LoadShedError`].
pub fn push_http_logical<T, NSvc>(
self,
http_metrics: policy::RouteMetrics,
grpc_metrics: policy::RouteMetrics,
) -> Outbound<svc::ArcNewCloneHttp<T>>
pub fn push_http_logical<T, NSvc>(self) -> Outbound<svc::ArcNewCloneHttp<T>>
where
// Logical target.
T: svc::Param<watch::Receiver<Routes>>,
Expand All @@ -106,11 +102,7 @@ impl<N> Outbound<N> {
concrete
// Share the concrete stack with each router stack.
.lift_new()
.push_on_service(RouterParams::layer(
rt.metrics.clone(),
http_metrics,
grpc_metrics,
))
.push_on_service(RouterParams::layer(rt.metrics.clone()))
// Rebuild the inner router stack every time the watch changes.
.push(svc::NewSpawnWatch::<Routes, _>::layer_into::<RouterParams<T>>())
.arc_new_clone_http()
Expand All @@ -126,8 +118,6 @@ where
{
fn layer<N, S>(
metrics: OutboundMetrics,
http_metrics: policy::RouteMetrics,
grpc_metrics: policy::RouteMetrics,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<RouterParams<T>>> + Clone
where
N: svc::NewService<Concrete<T>, Service = S>,
Expand All @@ -142,8 +132,8 @@ where
{
svc::layer::mk(move |concrete: N| {
let policy = svc::stack(concrete.clone()).push(policy::Policy::layer(
http_metrics.clone(),
grpc_metrics.clone(),
metrics.prom.http.http_route.clone(),
metrics.prom.http.grpc_route.clone(),
));
let profile =
svc::stack(concrete.clone()).push(profile::Params::layer(metrics.proxy.clone()));
Expand Down
Loading
Loading