Skip to content

Commit

Permalink
Rename push_on_response to push_on_service (#1235)
Browse files Browse the repository at this point in the history
The "on response" terminology is confusing: in all cases where we use
push_on_response, we're actually pushing a layer that operates on a
`NewService`'s returned `Service`. This rename helps to make this usage
clearer.
  • Loading branch information
olix0r authored Aug 31, 2021
1 parent d0878d5 commit 2e948ae
Show file tree
Hide file tree
Showing 21 changed files with 83 additions and 83 deletions.
2 changes: 1 addition & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Config {
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
let admin = svc::stack(move |_| admin.clone())
.push(metrics.http_endpoint.to_layer::<classify::Response, _, Http>())
.push_on_response(
.push_on_service(
svc::layers()
.push(metrics.http_errors.clone())
.push(errors::layer())
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ impl Config {
.push(tls::Client::layer(identity))
.push_timeout(self.connect.timeout)
.push(self::client::layer())
.push_on_response(svc::MapErrLayer::new(Into::into))
.push_on_service(svc::MapErrLayer::new(Into::into))
.into_new_service()
.push_new_reconnect(self.connect.backoff)
// Ensure individual endpoints are driven to readiness so that the balancer need not
// drive them all directly.
.push_on_response(svc::layer::mk(svc::SpawnReady::new))
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.push(self::resolve::layer(dns, resolve_backoff))
.push_on_response(self::control::balance::layer())
.push_on_service(self::control::balance::layer())
.into_new_service()
.push(metrics.to_layer::<classify::Response, _, _>())
.push(self::add_origin::layer())
.push_on_response(svc::layers().push_spawn_buffer(self.buffer_capacity))
.push_on_service(svc::layers().push_spawn_buffer(self.buffer_capacity))
.push_map_target(move |()| addr.clone())
.push(svc::BoxNewService::layer())
.into_inner()
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ impl<L> Layers<L> {
.push(BufferLayer::new(capacity))
}

pub fn push_on_response<U>(self, layer: U) -> Layers<Pair<L, stack::OnResponseLayer<U>>> {
self.push(stack::OnResponseLayer::new(layer))
pub fn push_on_service<U>(self, layer: U) -> Layers<Pair<L, stack::OnServiceLayer<U>>> {
self.push(stack::OnServiceLayer::new(layer))
}

pub fn push_instrument<G: Clone>(self, get_span: G) -> Layers<Pair<L, NewInstrumentLayer<G>>> {
Expand Down Expand Up @@ -171,8 +171,8 @@ impl<S> Stack<S> {

/// Assuming `S` implements `NewService` or `MakeService`, applies the given
/// `L`-typed layer on each service produced by `S`.
pub fn push_on_response<L: Clone>(self, layer: L) -> Stack<stack::OnResponse<L, S>> {
self.push(stack::OnResponseLayer::new(layer))
pub fn push_on_service<L: Clone>(self, layer: L) -> Stack<stack::OnService<L, S>> {
self.push(stack::OnServiceLayer::new(layer))
}

pub fn push_timeout(self, timeout: Duration) -> Stack<tower::timeout::Timeout<S>> {
Expand Down
12 changes: 6 additions & 6 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
}
}
}))
.push_on_response(
.push_on_service(
svc::layers()
.push(
inbound
Expand Down Expand Up @@ -195,7 +195,7 @@ where
}
}))
.instrument(|h: &HttpTarget| debug_span!("gateway", target = %h.target, v = %h.version))
.push_on_response(
.push_on_service(
svc::layers()
.push(
inbound
Expand All @@ -209,7 +209,7 @@ where
.push_spawn_buffer(buffer_capacity),
)
.push_cache(cache_max_idle_age)
.push_on_response(
.push_on_service(
svc::layers()
.push(http::Retain::layer())
.push(http::BoxResponse::layer()),
Expand Down Expand Up @@ -247,7 +247,7 @@ where
)
.push_http_server()
.into_stack()
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.push_switch(
|GatewayTransportHeader {
Expand All @@ -266,7 +266,7 @@ where
})),
None => Ok::<_, Infallible>(svc::Either::B(target)),
},
tcp.push_on_response(svc::BoxService::layer())
tcp.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.into_inner(),
)
Expand All @@ -277,7 +277,7 @@ where
},
legacy_http.into_inner(),
)
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.into_inner()
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<N> Inbound<N> {
let OrigDstAddr(addr) = t.param();
info_span!("server", port = addr.port())
})
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
14 changes: 7 additions & 7 deletions linkerd/app/inbound/src/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<N> Inbound<N> {
}))
},
svc::stack(forward.clone())
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.into_inner(),
)
.push(tls::NewDetectTls::layer(TlsParams {
Expand All @@ -150,10 +150,10 @@ impl<N> Inbound<N> {
Ok(svc::Either::A(t))
},
svc::stack(forward)
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.into_inner(),
)
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand All @@ -180,7 +180,7 @@ impl<N> Inbound<N> {

let detect = http
.clone()
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand Down Expand Up @@ -209,7 +209,7 @@ impl<N> Inbound<N> {
}
},
svc::stack(forward)
.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand All @@ -220,7 +220,7 @@ impl<N> Inbound<N> {
.push(detect::NewDetectService::layer(ConfigureHttpDetect))
.check_new_service::<Detect, I>();

http.push_on_response(svc::MapTargetLayer::new(io::BoxedIo::new))
http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand Down Expand Up @@ -248,7 +248,7 @@ impl<N> Inbound<N> {
},
detect.into_inner(),
)
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
.check_new_service::<Tls, I>()
})
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/inbound/src/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<N> Inbound<N> {
// HTTP detection is not necessary in this case, since the transport
// header indicates the connection's HTTP version.
svc::stack(gateway.clone())
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Left))
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left))
.push_map_target(GatewayConnection::TransportHeader)
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<N> Inbound<N> {
// with transport header support.
svc::stack(gateway)
.push_map_target(GatewayConnection::Legacy)
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Right))
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.push(transport::metrics::NewServer::layer(
rt.metrics.transport.clone(),
))
Expand All @@ -239,7 +239,7 @@ impl<N> Inbound<N> {
identity: rt.identity.clone().map(WithTransportHeaderAlpn),
}))
.check_new_service::<T, I>()
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
22 changes: 11 additions & 11 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<C> Inbound<C> {
config.proxy.connect.h1_settings,
config.proxy.connect.h2_settings,
))
.push_on_response(svc::MapErrLayer::new(Into::into))
.push_on_service(svc::MapErrLayer::new(Into::into))
.into_new_service()
.push_new_reconnect(config.proxy.connect.backoff)
.check_new_service::<Http, http::Request<_>>()
Expand All @@ -103,11 +103,11 @@ impl<C> Inbound<C> {
.http_endpoint
.to_layer::<classify::Response, _, _>(),
)
.push_on_response(http_tracing::client(
.push_on_service(http_tracing::client(
rt.span_sink.clone(),
super::trace_labels(),
))
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.check_new_service::<Logical, http::Request<_>>();

// Attempts to discover a service profile for each logical target (as
Expand All @@ -117,10 +117,10 @@ impl<C> Inbound<C> {
.check_new_service::<Logical, http::Request<http::BoxBody>>()
// The HTTP stack doesn't use the profile resolution, so drop it.
.push_map_target(Logical::from)
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.push(profiles::http::route_request::layer(
svc::proxies()
.push_on_response(http::BoxRequest::layer())
.push_on_service(http::BoxRequest::layer())
// Records per-route metrics.
.push(
rt.metrics
Expand All @@ -140,7 +140,7 @@ impl<C> Inbound<C> {
direction: metrics::Direction::In,
}
})
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.into_inner(),
))
.push_switch(
Expand All @@ -160,7 +160,7 @@ impl<C> Inbound<C> {
Ok(svc::Either::B(logical))
},
http.clone()
.push_on_response(http::BoxResponse::layer())
.push_on_service(http::BoxResponse::layer())
.check_new_service::<Logical, http::Request<_>>()
.into_inner(),
)
Expand All @@ -182,7 +182,7 @@ impl<C> Inbound<C> {
Ok(profiles::LookupAddr(addr.into()))
}))
.instrument(|_: &Logical| debug_span!("profile"))
.push_on_response(
.push_on_service(
svc::layers()
.push(http::BoxResponse::layer())
.push(svc::layer::mk(svc::SpawnReady::new)),
Expand All @@ -191,11 +191,11 @@ impl<C> Inbound<C> {
.push_when_unready(
config.profile_idle_timeout,
http.clone()
.push_on_response(svc::layer::mk(svc::SpawnReady::new))
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.into_inner(),
)
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_on_response(
.push_on_service(
svc::layers()
.push(rt.metrics.stack.layer(stack_labels("http", "logical")))
.push(svc::FailFast::layer(
Expand All @@ -205,7 +205,7 @@ impl<C> Inbound<C> {
.push_spawn_buffer(config.proxy.buffer_capacity),
)
.push_cache(config.proxy.cache_max_idle_age)
.push_on_response(
.push_on_service(
svc::layers()
.push(http::Retain::layer())
.push(http::BoxResponse::layer()),
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<H> Inbound<H> {
// the request may have been downgraded from a HTTP/2 orig-proto request.
.push(http::NewNormalizeUri::layer())
.push(NewSetIdentityHeader::layer())
.push_on_response(
.push_on_service(
svc::layers()
// Downgrades the protocol if upgraded by an outbound proxy.
.push(http::orig_proto::Downgrade::layer())
Expand All @@ -71,7 +71,7 @@ impl<H> Inbound<H> {
.check_new_service::<T, http::Request<_>>()
.instrument(|t: &T| debug_span!("http", v = %Param::<Version>::param(t)))
.push(http::NewServeHttp::layer(h2_settings, rt.drain.clone()))
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<S> Inbound<S> {
rt.metrics.transport.clone(),
))
.push_make_thunk()
.push_on_response(
.push_on_service(
svc::layers()
.push(tcp::Forward::layer())
.push(drain::Retain::layer(rt.drain.clone())),
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<N> Outbound<N> {
},
))
.instrument(|_: &_| debug_span!("profile"))
.push_on_response(
.push_on_service(
svc::layers()
// If the traffic split is empty/unavailable, eagerly fail
// requests. When the split is in failfast, spawn the
Expand Down
10 changes: 5 additions & 5 deletions linkerd/app/outbound/src/http/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ impl<N> Outbound<N> {

let skipped = tcp
.clone()
.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Left))
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left))
.into_inner();

svc::stack(http)
.push_on_response(
.push_on_service(
svc::layers()
.push(http::BoxRequest::layer())
.push(svc::MapErrLayer::new(Into::into)),
Expand All @@ -47,10 +47,10 @@ impl<N> Outbound<N> {
.push_map_target(U::from)
.instrument(|(v, _): &(http::Version, _)| debug_span!("http", %v))
.push(svc::UnwrapOr::layer(
tcp.push_on_response(svc::MapTargetLayer::new(io::EitherIo::Right))
tcp.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.into_inner(),
))
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.check_new_service::<(Option<http::Version>, T), _>()
.push_map_target(detect::allow_timeout)
.push(svc::BoxNewService::layer())
Expand All @@ -67,7 +67,7 @@ impl<N> Outbound<N> {
skipped,
)
.check_new_service::<T, _>()
.push_on_response(svc::BoxService::layer())
.push_on_service(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
}
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<C> Outbound<C> {
// HTTP/1.x fallback is supported as needed.
connect
.push(http::client::layer(h1_settings, h2_settings))
.push_on_response(svc::MapErrLayer::new(Into::<Error>::into))
.push_on_service(svc::MapErrLayer::new(Into::<Error>::into))
.check_service::<T>()
.into_new_service()
.push_new_reconnect(backoff)
Expand All @@ -46,7 +46,7 @@ impl<C> Outbound<C> {
.http_endpoint
.to_layer::<classify::Response, _, _>(),
)
.push_on_response(http_tracing::client(
.push_on_service(http_tracing::client(
rt.span_sink.clone(),
crate::trace_labels(),
))
Expand All @@ -55,7 +55,7 @@ impl<C> Outbound<C> {
"host",
CANONICAL_DST_HEADER,
]))
.push_on_response(
.push_on_service(
svc::layers()
.push(http::BoxResponse::layer())
.push(svc::BoxService::layer()),
Expand Down
Loading

0 comments on commit 2e948ae

Please sign in to comment.