Skip to content

Commit

Permalink
Replace runtime::controller::Context with Arc
Browse files Browse the repository at this point in the history
  • Loading branch information
nightkr committed May 13, 2022
1 parent 05b48cf commit deddf97
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 59 deletions.
10 changes: 5 additions & 5 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource},
runtime::controller::{Action, Context, Controller},
runtime::controller::{Action, Controller},
Client, CustomResource,
};
use schemars::JsonSchema;
Expand All @@ -29,8 +29,8 @@ struct ConfigMapGeneratorSpec {
}

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Result<Action, Error> {
let client = ctx.get_ref().client.clone();
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Arc<Data>) -> Result<Action, Error> {
let client = &ctx.client;

let mut contents = BTreeMap::new();
contents.insert("content".to_string(), generator.spec.content.clone());
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Re
}

/// The controller triggers this on reconcile errors
fn error_policy(_error: &Error, _ctx: Context<Data>) -> Action {
fn error_policy(_error: &Error, _ctx: Arc<Data>) -> Action {
Action::requeue(Duration::from_secs(1))
}

Expand Down Expand Up @@ -100,7 +100,7 @@ async fn main() -> Result<()> {
.owns(cms, ListParams::default())
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Context::new(Data { client }))
.run(reconcile, error_policy, Arc::new(Data { client }))
.for_each(|res| async move {
match res {
Ok(o) => info!("reconciled {:?}", o),
Expand Down
4 changes: 2 additions & 2 deletions examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl App {
info!("updating changed object {}", orig.name());
let data: DynamicObject = serde_yaml::from_str(&edited)?;
// NB: simplified kubectl constructs a merge-patch of differences
api.replace(&n, &Default::default(), &data).await?;
api.replace(n, &Default::default(), &data).await?;
}
} else {
warn!("need a name to edit");
Expand Down Expand Up @@ -185,7 +185,7 @@ async fn main() -> Result<()> {
// Defer to methods for verbs
if let Some(resource) = &app.resource {
// Common discovery, parameters, and api configuration for a single resource
let (ar, caps) = resolve_api_resource(&discovery, &resource)
let (ar, caps) = resolve_api_resource(&discovery, resource)
.with_context(|| format!("resource {:?} not found in cluster", resource))?;
let mut lp = ListParams::default();
if let Some(label) = &app.selector {
Expand Down
4 changes: 2 additions & 2 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use kube::{
api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource},
error::ErrorResponse,
runtime::{
controller::{Action, Context, Controller},
controller::{Action, Controller},
finalizer::{finalizer, Event},
},
};
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn main() -> anyhow::Result<()> {
}
},
|_err, _| Action::requeue(Duration::from_secs(2)),
Context::new(()),
Arc::new(()),
)
.for_each(|msg| async move { info!("Reconciled: {:?}", msg) })
.await;
Expand Down
73 changes: 23 additions & 50 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,6 @@ where
})
}

/// A context data type that's passed through to the controllers callbacks
///
/// `Context` gets passed to both the `reconciler` and the `error_policy` callbacks,
/// allowing a read-only view of the world without creating a big nested lambda.
/// More or less the same as Actix's [`Data`](https://docs.rs/actix-web/3.x/actix_web/web/struct.Data.html).
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Context<T>(Arc<T>);

impl<T> Context<T> {
/// Create new `Context` instance.
#[must_use]
pub fn new(state: T) -> Context<T> {
Context(Arc::new(state))
}

/// Get reference to inner controller data.
#[must_use]
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}

/// Convert to the internal `Arc<T>`.
#[must_use]
pub fn into_inner(self) -> Arc<T> {
self.0
}
}

/// A request to reconcile an object, annotated with why that request was made.
///
/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
Expand Down Expand Up @@ -242,10 +213,10 @@ impl Display for ReconcileReason {
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
Expand Down Expand Up @@ -352,7 +323,7 @@ where
/// use kube::{
/// Client, CustomResource,
/// api::{Api, ListParams},
/// runtime::controller::{Context, Controller, Action}
/// runtime::controller::{Controller, Action}
/// };
/// use serde::{Deserialize, Serialize};
/// use tokio::time::Duration;
Expand All @@ -373,21 +344,21 @@ where
/// }
///
/// /// The reconciler that will be called when either object change
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<Action, Error> {
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
/// // .. use api here to reconcile a child ConfigMap with ownerreferences
/// // see configmapgen_controller example for full info
/// Ok(Action::requeue(Duration::from_secs(300)))
/// }
/// /// an error handler that will be called when the reconciler fails
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> Action {
/// fn error_policy(_error: &Error, _ctx: Arc<()>) -> Action {
/// Action::requeue(Duration::from_secs(60))
/// }
///
/// /// something to drive the controller
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let client = Client::try_default().await?;
/// let context = Context::new(()); // bad empty context - put client in here
/// let context = Arc::new(()); // bad empty context - put client in here
/// let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
/// let cms = Api::<ConfigMap>::all(client.clone());
/// Controller::new(cmgs, ListParams::default())
Expand Down Expand Up @@ -611,9 +582,9 @@ where
/// use kube::{
/// Client,
/// api::{ListParams, Api, ResourceExt},
/// runtime::{controller::{Context, Controller, Action}},
/// runtime::{controller::{Controller, Action}},
/// };
/// use std::{convert::Infallible, io::BufRead};
/// use std::{convert::Infallible, io::BufRead, sync::Arc};
/// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
/// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
/// // and its worker prevents the Tokio runtime from shutting down.
Expand All @@ -633,7 +604,7 @@ where
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
/// Arc::new(()),
/// );
/// # };
/// ```
Expand Down Expand Up @@ -675,8 +646,8 @@ where
/// use futures::future::FutureExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{api::ListParams, Api, Client, ResourceExt};
/// use kube_runtime::controller::{Context, Controller, Action};
/// use std::convert::Infallible;
/// use kube_runtime::controller::{Controller, Action};
/// use std::{convert::Infallible, sync::Arc};
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
/// ListParams::default(),
Expand All @@ -688,7 +659,7 @@ where
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
/// Arc::new(()),
/// );
/// # };
/// ```
Expand Down Expand Up @@ -761,12 +732,12 @@ where
///
/// This creates a stream from all builder calls and starts an applier with
/// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
/// with a configurable [`Context`].
pub fn run<ReconcilerFut, T>(
/// with a configurable `context`.
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
Expand All @@ -792,7 +763,9 @@ where

#[cfg(test)]
mod tests {
use super::{Action, Context};
use std::sync::Arc;

use super::Action;
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
Expand All @@ -814,7 +787,7 @@ mod tests {
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<Action>()) },
|_: &std::io::Error, _| mock_type::<Action>(),
Context::new(()),
Arc::new(()),
),
);
}
Expand Down

0 comments on commit deddf97

Please sign in to comment.