Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace runtime::controller::Context with Arc #910

Merged
merged 1 commit into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource},
runtime::controller::{Action, Context, Controller},
runtime::controller::{Action, Controller},
Client, CustomResource,
};
use schemars::JsonSchema;
Expand All @@ -29,8 +29,8 @@ struct ConfigMapGeneratorSpec {
}

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Result<Action, Error> {
let client = ctx.get_ref().client.clone();
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Arc<Data>) -> Result<Action, Error> {
let client = &ctx.client;

let mut contents = BTreeMap::new();
contents.insert("content".to_string(), generator.spec.content.clone());
Expand Down Expand Up @@ -67,7 +67,7 @@ async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Re
}

/// The controller triggers this on reconcile errors
fn error_policy(_error: &Error, _ctx: Context<Data>) -> Action {
fn error_policy(_error: &Error, _ctx: Arc<Data>) -> Action {
Action::requeue(Duration::from_secs(1))
}

Expand Down Expand Up @@ -100,7 +100,7 @@ async fn main() -> Result<()> {
.owns(cms, ListParams::default())
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Context::new(Data { client }))
.run(reconcile, error_policy, Arc::new(Data { client }))
.for_each(|res| async move {
match res {
Ok(o) => info!("reconciled {:?}", o),
Expand Down
4 changes: 2 additions & 2 deletions examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl App {
info!("updating changed object {}", orig.name());
let data: DynamicObject = serde_yaml::from_str(&edited)?;
// NB: simplified kubectl constructs a merge-patch of differences
api.replace(&n, &Default::default(), &data).await?;
api.replace(n, &Default::default(), &data).await?;
}
} else {
warn!("need a name to edit");
Expand Down Expand Up @@ -185,7 +185,7 @@ async fn main() -> Result<()> {
// Defer to methods for verbs
if let Some(resource) = &app.resource {
// Common discovery, parameters, and api configuration for a single resource
let (ar, caps) = resolve_api_resource(&discovery, &resource)
let (ar, caps) = resolve_api_resource(&discovery, resource)
.with_context(|| format!("resource {:?} not found in cluster", resource))?;
let mut lp = ListParams::default();
if let Some(label) = &app.selector {
Expand Down
4 changes: 2 additions & 2 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use kube::{
api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource},
error::ErrorResponse,
runtime::{
controller::{Action, Context, Controller},
controller::{Action, Controller},
finalizer::{finalizer, Event},
},
};
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn main() -> anyhow::Result<()> {
}
},
|_err, _| Action::requeue(Duration::from_secs(2)),
Context::new(()),
Arc::new(()),
)
.for_each(|msg| async move { info!("Reconciled: {:?}", msg) })
.await;
Expand Down
73 changes: 23 additions & 50 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,6 @@ where
})
}

/// A context data type that's passed through to the controllers callbacks
///
/// `Context` gets passed to both the `reconciler` and the `error_policy` callbacks,
/// allowing a read-only view of the world without creating a big nested lambda.
/// More or less the same as Actix's [`Data`](https://docs.rs/actix-web/3.x/actix_web/web/struct.Data.html).
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Context<T>(Arc<T>);

impl<T> Context<T> {
/// Create new `Context` instance.
#[must_use]
pub fn new(state: T) -> Context<T> {
Context(Arc::new(state))
}

/// Get reference to inner controller data.
#[must_use]
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}

/// Convert to the internal `Arc<T>`.
#[must_use]
pub fn into_inner(self) -> Arc<T> {
self.0
}
}

/// A request to reconcile an object, annotated with why that request was made.
///
/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
Expand Down Expand Up @@ -242,10 +213,10 @@ impl Display for ReconcileReason {
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
Expand Down Expand Up @@ -352,7 +323,7 @@ where
/// use kube::{
/// Client, CustomResource,
/// api::{Api, ListParams},
/// runtime::controller::{Context, Controller, Action}
/// runtime::controller::{Controller, Action}
/// };
/// use serde::{Deserialize, Serialize};
/// use tokio::time::Duration;
Expand All @@ -373,21 +344,21 @@ where
/// }
///
/// /// The reconciler that will be called when either object change
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<Action, Error> {
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
/// // .. use api here to reconcile a child ConfigMap with ownerreferences
/// // see configmapgen_controller example for full info
/// Ok(Action::requeue(Duration::from_secs(300)))
/// }
/// /// an error handler that will be called when the reconciler fails
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> Action {
/// fn error_policy(_error: &Error, _ctx: Arc<()>) -> Action {
/// Action::requeue(Duration::from_secs(60))
/// }
///
/// /// something to drive the controller
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let client = Client::try_default().await?;
/// let context = Context::new(()); // bad empty context - put client in here
/// let context = Arc::new(()); // bad empty context - put client in here
/// let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
/// let cms = Api::<ConfigMap>::all(client.clone());
/// Controller::new(cmgs, ListParams::default())
Expand Down Expand Up @@ -611,9 +582,9 @@ where
/// use kube::{
/// Client,
/// api::{ListParams, Api, ResourceExt},
/// runtime::{controller::{Context, Controller, Action}},
/// runtime::{controller::{Controller, Action}},
/// };
/// use std::{convert::Infallible, io::BufRead};
/// use std::{convert::Infallible, io::BufRead, sync::Arc};
/// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
/// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
/// // and its worker prevents the Tokio runtime from shutting down.
Expand All @@ -633,7 +604,7 @@ where
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
/// Arc::new(()),
/// );
/// # };
/// ```
Expand Down Expand Up @@ -675,8 +646,8 @@ where
/// use futures::future::FutureExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{api::ListParams, Api, Client, ResourceExt};
/// use kube_runtime::controller::{Context, Controller, Action};
/// use std::convert::Infallible;
/// use kube_runtime::controller::{Controller, Action};
/// use std::{convert::Infallible, sync::Arc};
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
/// ListParams::default(),
Expand All @@ -688,7 +659,7 @@ where
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
/// Arc::new(()),
/// );
/// # };
/// ```
Expand Down Expand Up @@ -761,12 +732,12 @@ where
///
/// This creates a stream from all builder calls and starts an applier with
/// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
/// with a configurable [`Context`].
pub fn run<ReconcilerFut, T>(
/// with a configurable `context`.
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
Expand All @@ -792,7 +763,9 @@ where

#[cfg(test)]
mod tests {
use super::{Action, Context};
use std::sync::Arc;

use super::Action;
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
Expand All @@ -814,7 +787,7 @@ mod tests {
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<Action>()) },
|_: &std::io::Error, _| mock_type::<Action>(),
Context::new(()),
Arc::new(()),
),
);
}
Expand Down