Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controller<T> #184

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ openssl = { version = "0.10.28", optional = true }
rustls = { version = "0.16.0", optional = true }
bytes = "0.5.4"
Inflector = "0.11.4"
tokio = { version = "0.2.13", features = ["time"] }
tokio = { version = "0.2.13", features = ["full"] }

[dependencies.reqwest]
version = "0.10.2"
version = "0.10.4"
default-features = false
features = ["json", "gzip", "stream"]

Expand All @@ -54,7 +54,6 @@ rustls-tls = ["rustls", "reqwest/rustls-tls"]
kube-derive = { path = "../kube-derive" }
tempfile = "3.1.0"
env_logger = "0.7.1"
tokio = { version = "0.2.13", features = ["full"] }
anyhow = "1.0.26"

[dev-dependencies.k8s-openapi]
Expand Down
1 change: 1 addition & 0 deletions kube/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl Client {
async {
let mut buff = _buff; // can be avoided, see #145
loop {
// loop here in case the chunk is an incomplete WatchEvent
trace!("Await chunk");
match resp.chunk().await {
Ok(Some(chunk)) => {
Expand Down
176 changes: 176 additions & 0 deletions kube/src/runtime/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use crate::{
api::{
resource::{ListParams, Resource},
Meta, WatchEvent,
},
runtime::informer::Informer,
Client, Error, Result,
};
use futures::{lock::Mutex, stream, Stream, StreamExt};
use serde::de::DeserializeOwned;
use std::{convert::TryFrom, time::Duration};
use tokio::{sync::mpsc, time::delay_for};

/// An object to be reconciled
///
/// The type that is pulled out of Controller::poll
#[derive(Debug, Clone)]
pub struct ReconcileEvent {
pub name: String,
pub namespace: Option<String>,
}

impl<K> From<K> for ReconcileEvent
where
K: Meta,
{
fn from(k: K) -> ReconcileEvent {
ReconcileEvent {
name: Meta::name(&k),
namespace: Meta::namespace(&k),
}
}
}

impl<K> TryFrom<WatchEvent<K>> for ReconcileEvent
where
K: Meta + Clone,
{
type Error = crate::Error;

/// Helper to convert the openapi ReplicaSet to the useful info
fn try_from(w: WatchEvent<K>) -> Result<ReconcileEvent> {
match w {
WatchEvent::Added(o) => Ok(o.into()),
WatchEvent::Modified(o) => Ok(o.into()),
WatchEvent::Deleted(o) => Ok(o.into()),
WatchEvent::Error(e) => Err(Error::Api(e)),
}
}
}

/// An Ok return value from a reconcile fn
///
/// Designed so the Controller can decide whether to requeue the event
/// Error cases are not encapsulated in this struct (they are handled by Result)
#[derive(Debug)]
pub enum ReconcileStatus {
/// Successful reconcile
Complete,
/// Partial success, reque after some time
RequeAfter(Duration),
}

/// A controller for a kubernetes object K
#[derive(Clone)]
pub struct Controller<K, F>
where
K: Clone + DeserializeOwned + Meta,
F: Fn(ReconcileEvent) -> Result<ReconcileStatus>,
{
client: Client,
resource: Resource,
informers: Vec<Informer<K>>,
reconciler: Box<F>,
}

// TODO: is 'static limiting here?
impl<K: 'static, F: 'static> Controller<K, F>
where
K: Clone + DeserializeOwned + Meta + Send + Sync,
F: Fn(ReconcileEvent) -> Result<ReconcileStatus> + Send + Sync,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is pretty limiting that F can't return an impl Future.

{
/// Create a controller with a kube client on a kube resource
pub fn new(client: Client, r: Resource, recfn: F) -> Self {
Controller {
client: client,
resource: r,
informers: vec![],
reconciler: Box::new(recfn),
}
}

/// Create internal informers for an associated kube resource
///
/// TODO: this needs to only find resources with a property matching root resource
pub fn owns(mut self, r: Resource, lp: ListParams) -> Self {
self.informers.push(Informer::new(self.client.clone(), lp, r));
self
}

/// Initialize
pub fn init(self) {
info!("Starting Controller for {:?}", self.resource);
let (tx, mut rx) = mpsc::unbounded_channel();

// 1. poll informers in parallel and push results to queue
for inf in self.informers.clone() {
// TODO: ownership move?
//let queue = self.queue.clone();
let txi = tx.clone();
tokio::spawn(async move {
let mut poll_i = inf.poll().await.expect("could talk to api server").boxed();
while let Some(ev) = poll_i.next().await {
match ev {
Ok(wi) => {
let ri = ReconcileEvent::try_from(wi);
//(*queue.lock().await).push_back(ri);
txi.send(ri).expect("channel can receive");
}
_ => unimplemented!(),
//Err(e) => tx.unbounded_send(Err(e)),
}
}
});
}
// TODO: init main informer
// TODO: queue up events
// TODO: debounce events


// Prepare a sender so we can stack things back on the channel
let txa = tx.clone();
// Event loop that triggers the reconcile fn
tokio::spawn(async move {
Copy link
Member

@nightkr nightkr Mar 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense while prototyping.. but we should try to avoid tokio::spawn before releasing, because it means that kube-rs has to take complete control of the error handling. Should be possible to replace most uses with some variant of try_join_all/try_join!.

loop {
match rx.recv().await {
None => return, // tx dropped
Some(wi) => {
match wi {
Err(e) => {
// Got WatchEvent::Error
// TODO: handle here? handle above?
}
Ok(wo) => {
let ri = wo.clone();
let name = wo.name.clone();
match (self.reconciler)(wo) {
Ok(status) => {
// Reconcile cb completed with app decicion
match status {
ReconcileStatus::Complete => {
info!("Reconciled {}", name);
}
ReconcileStatus::RequeAfter(dur) => {
info!("Requeing {} in {:?}", name, dur);
delay_for(dur).await;
txa.send(Ok(ri)).expect("channel can receive");
}
}
}
Err(e) => {
// Reconcile cb failed (any unspecified error)
let dur = Duration::from_secs(10); // TODO: expo decay
warn!("Failed to reconcile {} - requining in {:?}", e, dur);
delay_for(dur).await;
txa.send(Ok(ri)).expect("channel can receive");
}
}
}
}
}
}
}
});
}
}
3 changes: 3 additions & 0 deletions kube/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ pub use self::reflector::Reflector;

mod informer;
pub use self::informer::Informer;

mod controller;
pub use self::controller::Controller;