Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename push_on_response to push_on_service #1235

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Config {
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
let admin = svc::stack(move |_| admin.clone())
.push(metrics.http_endpoint.to_layer::<classify::Response, _, Http>())
.push_on_response(
.push_on_service(
svc::layers()
.push(metrics.http_errors.clone())
.push(errors::layer())
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ impl Config {
.push(tls::Client::layer(identity))
.push_timeout(self.connect.timeout)
.push(self::client::layer())
.push_on_response(svc::MapErrLayer::new(Into::into))
.push_on_service(svc::MapErrLayer::new(Into::into))
.into_new_service()
.push_new_reconnect(self.connect.backoff)
// Ensure individual endpoints are driven to readiness so that the balancer need not
// drive them all directly.
.push_on_response(svc::layer::mk(svc::SpawnReady::new))
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.push(self::resolve::layer(dns, resolve_backoff))
.push_on_response(self::control::balance::layer())
.push_on_service(self::control::balance::layer())
.into_new_service()
.push(metrics.to_layer::<classify::Response, _, _>())
.push(self::add_origin::layer())
.push_on_response(svc::layers().push_spawn_buffer(self.buffer_capacity))
.push_on_service(svc::layers().push_spawn_buffer(self.buffer_capacity))
.push_map_target(move |()| addr.clone())
.push(svc::BoxNewService::layer())
.into_inner()
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ impl<L> Layers<L> {
.push(BufferLayer::new(capacity))
}

pub fn push_on_response<U>(self, layer: U) -> Layers<Pair<L, stack::OnResponseLayer<U>>> {
self.push(stack::OnResponseLayer::new(layer))
pub fn push_on_service<U>(self, layer: U) -> Layers<Pair<L, stack::OnServiceLayer<U>>> {
self.push(stack::OnServiceLayer::new(layer))
}

pub fn push_instrument<G: Clone>(self, get_span: G) -> Layers<Pair<L, NewInstrumentLayer<G>>> {
Expand Down Expand Up @@ -171,8 +171,8 @@ impl<S> Stack<S> {

/// Assuming `S` implements `NewService` or `MakeService`, applies the given
/// `L`-typed layer on each service produced by `S`.
pub fn push_on_response<L: Clone>(self, layer: L) -> Stack<stack::OnResponse<L, S>> {
self.push(stack::OnResponseLayer::new(layer))
pub fn push_on_service<L: Clone>(self, layer: L) -> Stack<stack::OnService<L, S>> {
self.push(stack::OnServiceLayer::new(layer))
}

pub fn push_timeout(self, timeout: Duration) -> Stack<tower::timeout::Timeout<S>> {
Expand Down
12 changes: 6 additions & 6 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
}
}
}))
.push_on_response(
.push_on_service(
svc::layers()
.push(
inbound
Expand Down Expand Up @@ -195,7 +195,7 @@ where
}
}))
.instrument(|h: &HttpTarget| debug_span!("gateway", target = %h.target, v = %h.version))
.push_on_response(
.push_on_service(
svc::layers()
.push(
inbound
Expand All @@ -209,7 +209,7 @@ where
.push_spawn_buffer(buffer_capacity),
)
.push_cache(cache_max_idle_age)
.push_on_response(
.push_on_service(
svc::layers()
.push(http::Retain::layer())
.push(http::BoxResponse::layer()),
Expand Down Expand Up @@ -247,7 +247,7 @@ where
)
.push_http_server()
.into_stack()
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.push_switch(
|GatewayTransportHeader {
Expand All @@ -266,7 +266,7 @@ where
})),
None => Ok::<_, Infallible>(svc::Either::B(target)),
},
tcp.push_on_response(svc::BoxService::layer())
tcp.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.into_inner(),
)
Expand All @@ -277,7 +277,7 @@ where
},
legacy_http.into_inner(),
)
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.into_inner()
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<N> Inbound<N> {
let OrigDstAddr(addr) = t.param();
info_span!("server", port = addr.port())
})
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
14 changes: 7 additions & 7 deletions linkerd/app/inbound/src/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<N> Inbound<N> {
}))
},
svc::stack(forward.clone())
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.into_inner(),
)
.push(tls::NewDetectTls::layer(TlsParams {
Expand All @@ -150,10 +150,10 @@ impl<N> Inbound<N> {
Ok(svc::Either::A(t))
},
svc::stack(forward)
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.into_inner(),
)
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand All @@ -180,7 +180,7 @@ impl<N> Inbound<N> {

let detect = http
.clone()
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand Down Expand Up @@ -209,7 +209,7 @@ impl<N> Inbound<N> {
}
},
svc::stack(forward)
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand All @@ -220,7 +220,7 @@ impl<N> Inbound<N> {
.push(detect::NewDetectService::layer(ConfigureHttpDetect))
.check_new_service::<Detect, I>();

http.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand Down Expand Up @@ -248,7 +248,7 @@ impl<N> Inbound<N> {
},
detect.into_inner(),
)
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.check_new_service::<Tls, I>()
})
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/inbound/src/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<N> Inbound<N> {
// HTTP detection is not necessary in this case, since the transport
// header indicates the connection's HTTP version.
svc::stack(gateway.clone())
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Left))
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left))
.push_map_target(GatewayConnection::TransportHeader)
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<N> Inbound<N> {
// with transport header support.
svc::stack(gateway)
.push_map_target(GatewayConnection::Legacy)
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Right))
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand All @@ -239,7 +239,7 @@ impl<N> Inbound<N> {
identity: rt.identity.clone().map(WithTransportHeaderAlpn),
}))
.check_new_service::<T, I>()
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
22 changes: 11 additions & 11 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<C> Inbound<C> {
config.proxy.connect.h1_settings,
config.proxy.connect.h2_settings,
))
.push_on_response(svc::MapErrLayer::new(Into::into))
.push_on_service(svc::MapErrLayer::new(Into::into))
.into_new_service()
.push_new_reconnect(config.proxy.connect.backoff)
.check_new_service::<Http, http::Request<_>>()
Expand All @@ -103,11 +103,11 @@ impl<C> Inbound<C> {
.http_endpoint
.to_layer::<classify::Response, _, _>(),
)
.push_on_response(http_tracing::client(
.push_on_service(http_tracing::client(
rt.span_sink.clone(),
super::trace_labels(),
))
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.check_new_service::<Logical, http::Request<_>>();

// Attempts to discover a service profile for each logical target (as
Expand All @@ -117,10 +117,10 @@ impl<C> Inbound<C> {
.check_new_service::<Logical, http::Request<http::BoxBody>>()
// The HTTP stack doesn't use the profile resolution, so drop it.
.push_map_target(Logical::from)
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.push(profiles::http::route_request::layer(
svc::proxies()
.push_on_response(http::BoxRequest::layer())
.push_on_service(http::BoxRequest::layer())
// Records per-route metrics.
.push(
rt.metrics
Expand All @@ -140,7 +140,7 @@ impl<C> Inbound<C> {
direction: metrics::Direction::In,
}
})
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.into_inner(),
))
.push_switch(
Expand All @@ -160,7 +160,7 @@ impl<C> Inbound<C> {
Ok(svc::Either::B(logical))
},
http.clone()
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.check_new_service::<Logical, http::Request<_>>()
.into_inner(),
)
Expand All @@ -182,7 +182,7 @@ impl<C> Inbound<C> {
Ok(profiles::LookupAddr(addr.into()))
}))
.instrument(|_: &Logical| debug_span!("profile"))
.push_on_response(
.push_on_service(
svc::layers()
.push(http::BoxResponse::layer())
.push(svc::layer::mk(svc::SpawnReady::new)),
Expand All @@ -191,11 +191,11 @@ impl<C> Inbound<C> {
.push_when_unready(
config.profile_idle_timeout,
http.clone()
.push_on_response(svc::layer::mk(svc::SpawnReady::new))
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.into_inner(),
)
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_on_response(
.push_on_service(
svc::layers()
.push(rt.metrics.stack.layer(stack_labels("http", "logical")))
.push(svc::FailFast::layer(
Expand All @@ -205,7 +205,7 @@ impl<C> Inbound<C> {
.push_spawn_buffer(config.proxy.buffer_capacity),
)
.push_cache(config.proxy.cache_max_idle_age)
.push_on_response(
.push_on_service(
svc::layers()
.push(http::Retain::layer())
.push(http::BoxResponse::layer()),
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<H> Inbound<H> {
// the request may have been downgraded from a HTTP/2 orig-proto request.
.push(http::NewNormalizeUri::layer())
.push(NewSetIdentityHeader::layer())
.push_on_response(
.push_on_service(
svc::layers()
// Downgrades the protocol if upgraded by an outbound proxy.
.push(http::orig_proto::Downgrade::layer())
Expand All @@ -71,7 +71,7 @@ impl<H> Inbound<H> {
.check_new_service::<T, http::Request<_>>()
.instrument(|t: &T| debug_span!("http", v = %Param::<Version>::param(t)))
.push(http::NewServeHttp::layer(h2_settings, rt.drain.clone()))
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<S> Inbound<S> {
rt.metrics.transport.clone(),
))
.push_make_thunk()
.push_on_response(
.push_on_service(
svc::layers()
.push(tcp::Forward::layer())
.push(drain::Retain::layer(rt.drain.clone())),
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<N> Outbound<N> {
},
))
.instrument(|_: &_| debug_span!("profile"))
.push_on_response(
.push_on_service(
svc::layers()
// If the traffic split is empty/unavailable, eagerly fail
// requests. When the split is in failfast, spawn the
Expand Down
10 changes: 5 additions & 5 deletions linkerd/app/outbound/src/http/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ impl<N> Outbound<N> {

let skipped = tcp
.clone()
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Left))
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left))
.into_inner();

svc::stack(http)
.push_on_response(
.push_on_service(
svc::layers()
.push(http::BoxRequest::layer())
.push(svc::MapErrLayer::new(Into::into)),
Expand All @@ -47,10 +47,10 @@ impl<N> Outbound<N> {
.push_map_target(U::from)
.instrument(|(v, _): &(http::Version, _)| debug_span!("http", %v))
.push(svc::UnwrapOr::layer(
tcp.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Right))
tcp.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.into_inner(),
))
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.check_new_service::<(Option<http::Version>, T), _>()
.push_map_target(detect::allow_timeout)
.push(svc::BoxNewService::layer())
Expand All @@ -67,7 +67,7 @@ impl<N> Outbound<N> {
skipped,
)
.check_new_service::<T, _>()
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<C> Outbound<C> {
// HTTP/1.x fallback is supported as needed.
connect
.push(http::client::layer(h1_settings, h2_settings))
.push_on_response(svc::MapErrLayer::new(Into::<Error>::into))
.push_on_service(svc::MapErrLayer::new(Into::<Error>::into))
.check_service::<T>()
.into_new_service()
.push_new_reconnect(backoff)
Expand All @@ -46,7 +46,7 @@ impl<C> Outbound<C> {
.http_endpoint
.to_layer::<classify::Response, _, _>(),
)
.push_on_response(http_tracing::client(
.push_on_service(http_tracing::client(
rt.span_sink.clone(),
crate::trace_labels(),
))
Expand All @@ -55,7 +55,7 @@ impl<C> Outbound<C> {
"host",
CANONICAL_DST_HEADER,
]))
.push_on_response(
.push_on_service(
svc::layers()
.push(http::BoxResponse::layer())
.push(svc::BoxService::layer()),
Expand Down
Loading