Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support interop between v8 and v9+ #1016

Merged
merged 24 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions crates/xds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,17 @@ impl Drop for DeltaSubscription {
impl AdsClient {
/// Attempts to start a new delta stream to the xDS management server, if the
/// management server does not support delta xDS we return the client as an error
#[allow(clippy::type_complexity)]
pub async fn delta_subscribe<C: crate::config::Configuration>(
self,
config: Arc<C>,
is_healthy: Arc<AtomicBool>,
notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
resources: impl IntoIterator<Item = (&'static str, Vec<String>)>,
resources: &'static [(&'static str, &'static [(&'static str, Vec<String>)])],
) -> Result<DeltaSubscription, Self> {
let resource_subscriptions: Vec<_> = resources.into_iter().collect();

let identifier = String::from(&*self.identifier);

let (mut ds, stream) = match DeltaClientStream::connect(
let (mut ds, mut stream) = match DeltaClientStream::connect(
self.client.clone(),
identifier.clone(),
)
Expand All @@ -433,6 +432,49 @@ impl AdsClient {
}
};

async fn handle_first_response(
stream: &mut tonic::Streaming<DeltaDiscoveryResponse>,
resources: &'static [(&'static str, &'static [(&'static str, Vec<String>)])],
) -> eyre::Result<&'static [(&'static str, Vec<String>)]> {
let resource_subscriptions = if let Some(first) = stream.message().await? {
let mut rsubs = None;
if first.type_url == "ignore-me" {
if !first.system_version_info.is_empty() {
rsubs = resources.iter().find_map(|(vers, subs)| {
(*vers == first.system_version_info).then_some(subs)
});
}
} else {
tracing::warn!("expected `ignore-me` response from management server");
}

if let Some(subs) = rsubs {
subs
} else {
let Some(subs) = resources
.iter()
.find_map(|(vers, subs)| vers.is_empty().then_some(subs))
else {
eyre::bail!("failed to find fallback resource subscription set");
};

subs
}
} else {
eyre::bail!("expected at least one response from the management server");
};

Ok(dbg!(resource_subscriptions))
}

let resource_subscriptions = match handle_first_response(&mut stream, resources).await {
Ok(rs) => rs,
Err(error) => {
tracing::error!(%error, "failed to acquire matching resource subscriptions based on response from management sever");
return Err(self);
}
};

// Send requests for our resource subscriptions, in this first request we
// won't have any resources, but if we reconnect to management servers in
// the future we'll send the resources we already have locally to hopefully
Expand All @@ -454,6 +496,7 @@ impl AdsClient {
async move {
tracing::trace!("starting xDS delta stream task");
let mut stream = stream;
let mut resource_subscriptions = resource_subscriptions;

loop {
tracing::trace!("creating discovery response handler");
Expand Down Expand Up @@ -504,6 +547,9 @@ impl AdsClient {

(ds, stream) =
DeltaClientStream::connect(new_client, identifier.clone()).await?;

resource_subscriptions = handle_first_response(&mut stream, resources).await?;

ds.refresh(&identifier, resource_subscriptions.to_vec(), &local)
.await?;
}
Expand Down
5 changes: 4 additions & 1 deletion crates/xds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ pub trait Configuration: Send + Sync + Sized + 'static {
subscribed: crate::server::ControlPlane<Self>,
) -> impl std::future::Future<Output = ()> + Send + 'static;

fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)>;
fn interested_resources(
&self,
server_version: &str,
) -> impl Iterator<Item = (&'static str, Vec<String>)>;
}

pub struct DeltaDiscoveryRes {
Expand Down
56 changes: 36 additions & 20 deletions crates/xds/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::{
net::TcpListener,
};

const VERSION_INFO: &str = "9";

pub struct ControlPlane<C> {
pub config: Arc<C>,
pub idle_request_interval: Duration,
Expand Down Expand Up @@ -218,8 +220,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
control_plane: Some(control_plane_id.clone()),
type_url: type_url.into(),
removed_resources,
// Only used for debugging, not really useful
system_version_info: String::new(),
system_version_info: VERSION_INFO.into(),
};

tracing::trace!(
Expand Down Expand Up @@ -248,8 +249,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
control_plane: None,
type_url: message.type_url,
removed_resources: Vec::new(),
// Only used for debugging, not really useful
system_version_info: String::new(),
system_version_info: VERSION_INFO.into(),
}
} else {
tracing::debug!(client = %node_id, resource_type = %message.type_url, "initial delta response");
Expand Down Expand Up @@ -406,17 +406,25 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for

tracing::info!("control plane discovery delta stream attempt");
let mut responses = responses.into_inner();
let Some(identifier) = responses

fn handle_first_response(
res: DeltaDiscoveryResponse,
) -> Result<(String, String), tonic::Status> {
let Some(identifier) = res.control_plane.map(|cp| cp.identifier) else {
return Err(tonic::Status::invalid_argument(
"DeltaDiscoveryResponse.control_plane.identifier is required in the first message",
));
};

Ok((identifier, res.system_version_info))
}

let first_response = responses
.next()
.await
.ok_or_else(|| tonic::Status::cancelled("received empty first response"))??
.control_plane
.map(|cp| cp.identifier)
else {
return Err(tonic::Status::invalid_argument(
"DeltaDiscoveryResponse.control_plane.identifier is required in the first message",
));
};
.ok_or_else(|| tonic::Status::cancelled("received empty first response"))??;

let (identifier, server_version) = handle_first_response(first_response)?;

tracing::info!(identifier, "new control plane delta discovery stream");
let config = self.config.clone();
Expand All @@ -429,12 +437,16 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
tracing::info!(identifier, "sending initial delta discovery request");

let local = Arc::new(crate::config::LocalVersions::new(
config.interested_resources().map(|(n, _)| n),
config.interested_resources(&server_version).map(|(n, _)| n),
));

ds.refresh(&identifier, config.interested_resources().collect(), &local)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
ds.refresh(
&identifier,
config.interested_resources(&server_version).collect(),
&local,
)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;

let mut response_stream = crate::config::handle_delta_discovery_responses(
identifier.clone(),
Expand All @@ -456,9 +468,13 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
.map_err(|_| tonic::Status::internal("this should not be reachable"))?;
} else {
tracing::trace!("exceeded idle interval, sending request");
ds.refresh(&identifier, config.interested_resources().collect(), &local)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
ds.refresh(
&identifier,
config.interested_resources(&server_version).collect(),
&local,
)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
}
}
}
Expand Down
49 changes: 28 additions & 21 deletions examples/xds/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ impl xds::config::Configuration for ClientConfig {
unreachable!();
}

fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)> {
fn interested_resources(
&self,
_server_version: &str,
) -> impl Iterator<Item = (&'static str, Vec<String>)> {
[].into_iter()
}

Expand Down Expand Up @@ -145,13 +148,13 @@ impl xds::config::Configuration for ServerConfig {
}
}

Ok(DeltaDiscoveryRes {
resources,
removed,
})
Ok(DeltaDiscoveryRes { resources, removed })
}

fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)> {
fn interested_resources(
&self,
_server_version: &str,
) -> impl Iterator<Item = (&'static str, Vec<String>)> {
[(TYPE, Vec::new())].into_iter()
}

Expand All @@ -164,7 +167,9 @@ impl xds::config::Configuration for ServerConfig {

async move {
loop {
if item_watcher.recv().await.is_err() { break; };
if item_watcher.recv().await.is_err() {
break;
};
control_plane.push_update(TYPE);
}
}
Expand All @@ -189,33 +194,35 @@ async fn main() {
let relay_listener = xds::net::TcpListener::bind(None).unwrap();
let addr = relay_listener.local_addr();

let server = xds::server::ControlPlane::from_arc(sc.clone(), std::time::Duration::from_secs(60)).management_server(relay_listener).unwrap();
let server =
xds::server::ControlPlane::from_arc(sc.clone(), std::time::Duration::from_secs(60))
.management_server(relay_listener)
.unwrap();

tokio::task::spawn(async move {
server.await
});
tokio::task::spawn(async move { server.await });

let client = xds::client::AdsClient::connect(
"client".into(),
vec![format!("http://{addr}").try_into().unwrap()],
)
.await.unwrap();
.await
.unwrap();

let (stx, srx) = tokio::sync::oneshot::channel();

tokio::task::spawn({
let cc = cc.clone();
async move {
let _stream = client
.delta_subscribe(
cc,
Arc::new(std::sync::atomic::AtomicBool::new(true)),
None,
[
(TYPE, Vec::new()),
],
)
.await.map_err(|_| "failed to subscribe").unwrap();
.delta_subscribe(
cc,
Arc::new(std::sync::atomic::AtomicBool::new(true)),
None,
&[("", &[(TYPE, Vec::new())])],
)
.await
.map_err(|_| "failed to subscribe")
.unwrap();

srx.await.unwrap();
}
Expand Down
31 changes: 21 additions & 10 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,26 @@ impl Proxy {
shutdown_rx.clone(),
);

#[allow(clippy::type_complexity)]
const SUBS: &[(&str, &[(&str, Vec<String>)])] = &[
(
"9",
&[
(crate::xds::CLUSTER_TYPE, Vec::new()),
(crate::xds::DATACENTER_TYPE, Vec::new()),
(crate::xds::FILTER_CHAIN_TYPE, Vec::new()),
],
),
(
"",
&[
(crate::xds::CLUSTER_TYPE, Vec::new()),
(crate::xds::DATACENTER_TYPE, Vec::new()),
(crate::xds::LISTENER_TYPE, Vec::new()),
],
),
];

if !self.management_servers.is_empty() {
{
let mut lock = ready.xds_is_healthy.write();
Expand Down Expand Up @@ -259,16 +279,7 @@ impl Proxy {
ready.xds_is_healthy.read().as_ref().unwrap().clone();

let _stream = client
.delta_subscribe(
config.clone(),
xds_is_healthy.clone(),
tx,
[
(crate::xds::CLUSTER_TYPE, Vec::new()),
(crate::xds::DATACENTER_TYPE, Vec::new()),
(crate::xds::FILTER_CHAIN_TYPE, Vec::new()),
],
)
.delta_subscribe(config.clone(), xds_is_healthy.clone(), tx, SUBS)
.await
.map_err(|_| eyre::eyre!("failed to acquire delta stream"))?;

Expand Down
Loading
Loading