diff --git a/kube/Cargo.toml b/kube/Cargo.toml index c00617d11..faebe31f9 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -33,10 +33,10 @@ openssl = { version = "0.10.28", optional = true } rustls = { version = "0.16.0", optional = true } bytes = "0.5.4" Inflector = "0.11.4" -tokio = { version = "0.2.13", features = ["time"] } +tokio = { version = "0.2.13", features = ["full"] } [dependencies.reqwest] -version = "0.10.2" +version = "0.10.4" default-features = false features = ["json", "gzip", "stream"] @@ -54,7 +54,6 @@ rustls-tls = ["rustls", "reqwest/rustls-tls"] kube-derive = { path = "../kube-derive" } tempfile = "3.1.0" env_logger = "0.7.1" -tokio = { version = "0.2.13", features = ["full"] } anyhow = "1.0.26" [dev-dependencies.k8s-openapi] diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index 433e95758..592090947 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -183,6 +183,7 @@ impl Client { async { let mut buff = _buff; // can be avoided, see #145 loop { + // loop here in case the chunk is an incomplete WatchEvent trace!("Await chunk"); match resp.chunk().await { Ok(Some(chunk)) => { diff --git a/kube/src/runtime/controller.rs b/kube/src/runtime/controller.rs new file mode 100644 index 000000000..d0649c127 --- /dev/null +++ b/kube/src/runtime/controller.rs @@ -0,0 +1,176 @@ +use crate::{ + api::{ + resource::{ListParams, Resource}, + Meta, WatchEvent, + }, + runtime::informer::Informer, + Client, Error, Result, +}; +use futures::{lock::Mutex, stream, Stream, StreamExt}; +use serde::de::DeserializeOwned; +use std::{convert::TryFrom, time::Duration}; +use tokio::{sync::mpsc, time::delay_for}; + +/// An object to be reconciled +/// +/// The type that is pulled out of Controller::poll +#[derive(Debug, Clone)] +pub struct ReconcileEvent { + pub name: String, + pub namespace: Option, +} + +impl From for ReconcileEvent +where + K: Meta, +{ + fn from(k: K) -> ReconcileEvent { + ReconcileEvent { + name: Meta::name(&k), + namespace: Meta::namespace(&k), + } + } +} + +impl TryFrom> for ReconcileEvent +where + K: Meta + Clone, +{ + type Error = crate::Error; + + /// Helper to convert the openapi ReplicaSet to the useful info + fn try_from(w: WatchEvent) -> Result { + match w { + WatchEvent::Added(o) => Ok(o.into()), + WatchEvent::Modified(o) => Ok(o.into()), + WatchEvent::Deleted(o) => Ok(o.into()), + WatchEvent::Error(e) => Err(Error::Api(e)), + } + } +} + +/// An Ok return value from a reconcile fn +/// +/// Designed so the Controller can decide whether to requeue the event +/// Error cases are not encapsulated in this struct (they are handled by Result) +#[derive(Debug)] +pub enum ReconcileStatus { + /// Successful reconcile + Complete, + /// Partial success, reque after some time + RequeAfter(Duration), +} + +/// A controller for a kubernetes object K +#[derive(Clone)] +pub struct Controller +where + K: Clone + DeserializeOwned + Meta, + F: Fn(ReconcileEvent) -> Result, +{ + client: Client, + resource: Resource, + informers: Vec>, + reconciler: Box, +} + +// TODO: is 'static limiting here? +impl Controller +where + K: Clone + DeserializeOwned + Meta + Send + Sync, + F: Fn(ReconcileEvent) -> Result + Send + Sync, +{ + /// Create a controller with a kube client on a kube resource + pub fn new(client: Client, r: Resource, recfn: F) -> Self { + Controller { + client: client, + resource: r, + informers: vec![], + reconciler: Box::new(recfn), + } + } + + /// Create internal informers for an associated kube resource + /// + /// TODO: this needs to only find resources with a property matching root resource + pub fn owns(mut self, r: Resource, lp: ListParams) -> Self { + self.informers.push(Informer::new(self.client.clone(), lp, r)); + self + } + + /// Initialize + pub fn init(self) { + info!("Starting Controller for {:?}", self.resource); + let (tx, mut rx) = mpsc::unbounded_channel(); + + // 1. poll informers in parallel and push results to queue + for inf in self.informers.clone() { + // TODO: ownership move? + //let queue = self.queue.clone(); + let txi = tx.clone(); + tokio::spawn(async move { + let mut poll_i = inf.poll().await.expect("could talk to api server").boxed(); + while let Some(ev) = poll_i.next().await { + match ev { + Ok(wi) => { + let ri = ReconcileEvent::try_from(wi); + //(*queue.lock().await).push_back(ri); + txi.send(ri).expect("channel can receive"); + } + _ => unimplemented!(), + //Err(e) => tx.unbounded_send(Err(e)), + } + } + }); + } + // TODO: init main informer + // TODO: queue up events + // TODO: debounce events + + + // Prepare a sender so we can stack things back on the channel + let txa = tx.clone(); + // Event loop that triggers the reconcile fn + tokio::spawn(async move { + loop { + match rx.recv().await { + None => return, // tx dropped + Some(wi) => { + match wi { + Err(e) => { + // Got WatchEvent::Error + // TODO: handle here? handle above? + } + Ok(wo) => { + let ri = wo.clone(); + let name = wo.name.clone(); + match (self.reconciler)(wo) { + Ok(status) => { + // Reconcile cb completed with app decicion + match status { + ReconcileStatus::Complete => { + info!("Reconciled {}", name); + } + ReconcileStatus::RequeAfter(dur) => { + info!("Requeing {} in {:?}", name, dur); + delay_for(dur).await; + txa.send(Ok(ri)).expect("channel can receive"); + } + } + } + Err(e) => { + // Reconcile cb failed (any unspecified error) + let dur = Duration::from_secs(10); // TODO: expo decay + warn!("Failed to reconcile {} - requining in {:?}", e, dur); + delay_for(dur).await; + txa.send(Ok(ri)).expect("channel can receive"); + } + } + } + } + } + } + } + }); + } +} diff --git a/kube/src/runtime/mod.rs b/kube/src/runtime/mod.rs index 62cd23781..7f2d9ab0f 100644 --- a/kube/src/runtime/mod.rs +++ b/kube/src/runtime/mod.rs @@ -3,3 +3,6 @@ pub use self::reflector::Reflector; mod informer; pub use self::informer::Informer; + +mod controller; +pub use self::controller::Controller;