Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control: Enforce timeouts on response stream #2587

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,7 @@ dependencies = [
"linkerd-app-outbound",
"linkerd-error",
"linkerd-opencensus",
"linkerd-tonic-stream",
"rangemap",
"regex",
"thiserror",
Expand Down Expand Up @@ -1215,6 +1216,7 @@ dependencies = [
"linkerd-meshtls-rustls",
"linkerd-proxy-client-policy",
"linkerd-proxy-server-policy",
"linkerd-tonic-stream",
"linkerd-tonic-watch",
"linkerd-tracing",
"linkerd2-proxy-api",
Expand Down Expand Up @@ -1286,6 +1288,7 @@ dependencies = [
"linkerd-proxy-client-policy",
"linkerd-retry",
"linkerd-stack",
"linkerd-tonic-stream",
"linkerd-tonic-watch",
"linkerd-tracing",
"linkerd2-proxy-api",
Expand Down Expand Up @@ -1671,7 +1674,6 @@ dependencies = [
name = "linkerd-proxy-api-resolve"
version = "0.1.0"
dependencies = [
"async-stream",
"futures",
"http",
"http-body",
Expand All @@ -1680,6 +1682,7 @@ dependencies = [
"linkerd-proxy-core",
"linkerd-stack",
"linkerd-tls",
"linkerd-tonic-stream",
"linkerd2-proxy-api",
"pin-project",
"prost",
Expand Down Expand Up @@ -1980,6 +1983,7 @@ dependencies = [
"linkerd-http-box",
"linkerd-proxy-api-resolve",
"linkerd-stack",
"linkerd-tonic-stream",
"linkerd-tonic-watch",
"linkerd2-proxy-api",
"once_cell",
Expand Down Expand Up @@ -2078,6 +2082,21 @@ dependencies = [
name = "linkerd-tls-test-util"
version = "0.1.0"

[[package]]
name = "linkerd-tonic-stream"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-stack",
"linkerd-tracing",
"pin-project",
"tokio",
"tokio-stream",
"tokio-test",
"tonic",
"tracing",
]

[[package]]
name = "linkerd-tonic-watch"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ members = [
"linkerd/stack/metrics",
"linkerd/stack/tracing",
"linkerd/system",
"linkerd/tonic-stream",
"linkerd/tonic-watch",
"linkerd/tls",
"linkerd/tls/test-util",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" }
linkerd-app-outbound = { path = "./outbound" }
linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-tonic-stream = { path = "../tonic-stream" }
rangemap = "1"
regex = "1"
thiserror = "1"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ linkerd-idle-cache = { path = "../../idle-cache" }
linkerd-meshtls = { path = "../../meshtls", optional = true }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.12", features = ["inbound"] }
once_cell = "1"
Expand Down
55 changes: 33 additions & 22 deletions linkerd/app/inbound/src/policy/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
Error, Recover, Result,
};
use linkerd_proxy_server_policy::ServerPolicy;
use linkerd_tonic_stream::{LimitReceiveFuture, ReceiveLimits};
use linkerd_tonic_watch::StreamWatch;
use std::{sync::Arc, time};
use std::sync::Arc;
use tokio::time;

#[derive(Clone, Debug)]
pub(super) struct Api<S> {
workload: Arc<str>,
detect_timeout: time::Duration,
limits: ReceiveLimits,
default_detect_timeout: time::Duration,
client: Client<S>,
}

Expand All @@ -34,10 +37,16 @@
S::ResponseBody:
http::HttpBody<Data = tonic::codegen::Bytes, Error = Error> + Default + Send + 'static,
{
pub(super) fn new(workload: Arc<str>, detect_timeout: time::Duration, client: S) -> Self {
pub(super) fn new(
workload: Arc<str>,
limits: ReceiveLimits,
default_detect_timeout: time::Duration,
client: S,
) -> Self {
Self {
workload,
detect_timeout,
limits,
default_detect_timeout,
client: Client::new(client),
}
}
Expand Down Expand Up @@ -72,26 +81,28 @@
port: port.into(),
workload: self.workload.as_ref().to_owned(),
};
let detect_timeout = self.detect_timeout;

let detect_timeout = self.default_detect_timeout;
let limits = self.limits;
let mut client = self.client.clone();
Box::pin(async move {
let rsp = client.watch_port(tonic::Request::new(req)).await?;
Ok(rsp.map(|updates| {
updates
.map_ok(move |up| {
// If the server returned an invalid server policy, we
// default to using an invalid policy that causes all
// requests to report an internal error.
let policy = ServerPolicy::try_from(up).unwrap_or_else(|error| {
tracing::warn!(%error, "Server misconfigured");
INVALID_POLICY
.get_or_init(|| ServerPolicy::invalid(detect_timeout))
.clone()
});
tracing::debug!(?policy);
policy
})
.boxed()
let rsp = LimitReceiveFuture::new(limits, client.watch_port(tonic::Request::new(req)))
.await?;
Ok(rsp.map(move |s| {
s.map_ok(move |up| {
// If the server returned an invalid server policy, we
// default to using an invalid policy that causes all
// requests to report an internal error.
let policy = ServerPolicy::try_from(up).unwrap_or_else(|error| {
tracing::warn!(%error, "Server misconfigured");
INVALID_POLICY
.get_or_init(|| ServerPolicy::invalid(detect_timeout))
.clone()

Check warning on line 100 in linkerd/app/inbound/src/policy/api.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/inbound/src/policy/api.rs#L97-L100

Added lines #L97 - L100 were not covered by tests
});
tracing::debug!(?policy);
policy
})
.boxed()

Check warning on line 105 in linkerd/app/inbound/src/policy/api.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/inbound/src/policy/api.rs#L105

Added line #L105 was not covered by tests
}))
})
}
Expand Down
4 changes: 3 additions & 1 deletion linkerd/app/inbound/src/policy/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{api::Api, DefaultPolicy, GetPolicy, Protocol, ServerPolicy, Store};
use linkerd_app_core::{exp_backoff::ExponentialBackoff, proxy::http, Error};
use linkerd_tonic_stream::ReceiveLimits;
use rangemap::RangeInclusiveSet;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -36,6 +37,7 @@ impl Config {
workload: Arc<str>,
client: C,
backoff: ExponentialBackoff,
limits: ReceiveLimits,
) -> impl GetPolicy + Clone + Send + Sync + 'static
where
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
Expand Down Expand Up @@ -66,7 +68,7 @@ impl Config {
}) => timeout,
_ => Duration::from_secs(10),
};
Api::new(workload, detect_timeout, client).into_watch(backoff)
Api::new(workload, limits, detect_timeout, client).into_watch(backoff)
};
Store::spawn_discover(default, cache_max_idle_age, watch, ports, opaque_ports)
}
Expand Down
7 changes: 6 additions & 1 deletion linkerd/app/inbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
transport::{self, addrs::*},
Error,
};
use linkerd_tonic_stream::ReceiveLimits;
use std::{fmt::Debug, sync::Arc};
use tracing::debug_span;

Expand All @@ -23,6 +24,7 @@
workload: Arc<str>,
client: C,
backoff: ExponentialBackoff,
limits: ReceiveLimits,
) -> impl policy::GetPolicy + Clone + Send + Sync + 'static
where
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
Expand All @@ -31,7 +33,10 @@
C::ResponseBody: Default + Send + 'static,
C::Future: Send,
{
self.config.policy.clone().build(workload, client, backoff)
self.config
.policy

Check warning on line 37 in linkerd/app/inbound/src/server.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/inbound/src/server.rs#L37

Added line #L37 was not covered by tests
.clone()
.build(workload, client, backoff, limits)
}

pub fn mk<A, I, P>(
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ linkerd-proxy-client-policy = { path = "../../proxy/client-policy", features = [
"proto",
] }
linkerd-retry = { path = "../../retry" }
linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
once_cell = "1"
parking_lot = "0.12"
Expand Down
4 changes: 3 additions & 1 deletion linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use linkerd_app_core::{
transport::addrs::*,
AddrMatch, Error, ProxyRuntime,
};
use linkerd_tonic_stream::ReceiveLimits;
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
Expand Down Expand Up @@ -141,6 +142,7 @@ impl Outbound<()> {
workload: Arc<str>,
client: C,
backoff: ExponentialBackoff,
limits: ReceiveLimits,
) -> impl policy::GetPolicy
where
C: tonic::client::GrpcService<tonic::body::BoxBody, Error = Error>,
Expand All @@ -149,7 +151,7 @@ impl Outbound<()> {
C::ResponseBody: Default + Send + 'static,
C::Future: Send,
{
policy::Api::new(workload, Duration::from_secs(10), client)
policy::Api::new(workload, limits, Duration::from_secs(10), client)
.into_watch(backoff)
.map_result(|response| match response {
Err(e) => Err(e.into()),
Expand Down
55 changes: 33 additions & 22 deletions linkerd/app/outbound/src/policy/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
Addr, Error, Recover, Result,
};
use linkerd_proxy_client_policy::ClientPolicy;
use linkerd_tonic_stream::{LimitReceiveFuture, ReceiveLimits};
use linkerd_tonic_watch::StreamWatch;
use std::{sync::Arc, time};
use std::sync::Arc;
use tokio::time;

#[derive(Clone, Debug)]
pub(crate) struct Api<S> {
workload: Arc<str>,
detect_timeout: time::Duration,
limits: ReceiveLimits,
default_detect_timeout: time::Duration,
client: Client<S>,
}

Expand All @@ -33,10 +36,16 @@
S::ResponseBody:
http::HttpBody<Data = tonic::codegen::Bytes, Error = Error> + Default + Send + 'static,
{
pub(crate) fn new(workload: Arc<str>, detect_timeout: time::Duration, client: S) -> Self {
pub(crate) fn new(
workload: Arc<str>,
limits: ReceiveLimits,
default_detect_timeout: time::Duration,
client: S,
) -> Self {
Self {
workload,
detect_timeout,
limits,
default_detect_timeout,
client: Client::new(client),
}
}
Expand Down Expand Up @@ -77,26 +86,28 @@
target: Some(target),
}
};
let detect_timeout = self.detect_timeout;

let detect_timeout = self.default_detect_timeout;
let limits = self.limits;
let mut client = self.client.clone();
Box::pin(async move {
let rsp = client.watch(tonic::Request::new(req)).await?;
Ok(rsp.map(|updates| {
updates
.map_ok(move |up| {
// If the server returned an invalid client policy, we
// default to using an invalid policy that causes all
// requests to report an internal error.
let policy = ClientPolicy::try_from(up).unwrap_or_else(|error| {
tracing::warn!(%error, "Client policy misconfigured");
INVALID_POLICY
.get_or_init(|| ClientPolicy::invalid(detect_timeout))
.clone()
});
tracing::debug!(?policy);
policy
})
.boxed()
let rsp =
LimitReceiveFuture::new(limits, client.watch(tonic::Request::new(req))).await?;
Ok(rsp.map(move |s| {
s.map_ok(move |up| {
// If the server returned an invalid client policy, we
// default to using an invalid policy that causes all
// requests to report an internal error.
let policy = ClientPolicy::try_from(up).unwrap_or_else(|error| {
tracing::warn!(%error, "Client policy misconfigured");
INVALID_POLICY
.get_or_init(|| ClientPolicy::invalid(detect_timeout))
.clone()

Check warning on line 105 in linkerd/app/outbound/src/policy/api.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/outbound/src/policy/api.rs#L102-L105

Added lines #L102 - L105 were not covered by tests
});
tracing::debug!(?policy);
policy
})
.boxed()

Check warning on line 110 in linkerd/app/outbound/src/policy/api.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/outbound/src/policy/api.rs#L110

Added line #L110 was not covered by tests
}))
})
}
Expand Down
15 changes: 12 additions & 3 deletions linkerd/app/src/dst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use linkerd_app_core::{
svc::{self, NewService, ServiceExt},
Error, Recover,
};
use linkerd_tonic_stream::ReceiveLimits;

#[derive(Clone, Debug)]
pub struct Config {
pub control: control::Config,
pub context: String,
pub limits: ReceiveLimits,
}

/// Handles to destination service clients.
Expand Down Expand Up @@ -58,13 +60,20 @@ impl Config {
.new_service(())
.map_err(Error::from);

let profiles =
profiles::Client::new_recover_default(backoff, svc.clone(), self.context.clone());
let profiles = profiles::Client::new_recover_default(
backoff,
svc.clone(),
self.context.clone(),
self.limits,
);

Ok(Dst {
addr,
profiles,
resolve: recover::Resolve::new(backoff, api::Resolve::new(svc, self.context)),
resolve: recover::Resolve::new(
backoff,
api::Resolve::new(svc, self.context, self.limits),
),
})
}
}
Expand Down
Loading
Loading