From deddf975a0cd4246980479f561f2b07513a652ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 13 May 2022 15:38:54 +0200 Subject: [PATCH] Replace runtime::controller::Context with Arc --- examples/configmapgen_controller.rs | 10 ++-- examples/kubectl.rs | 4 +- examples/secret_syncer.rs | 4 +- kube-runtime/src/controller/mod.rs | 73 +++++++++-------------------- 4 files changed, 32 insertions(+), 59 deletions(-) diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index baa580dfa..0597a54a8 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -3,7 +3,7 @@ use futures::StreamExt; use k8s_openapi::api::core::v1::ConfigMap; use kube::{ api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource}, - runtime::controller::{Action, Context, Controller}, + runtime::controller::{Action, Controller}, Client, CustomResource, }; use schemars::JsonSchema; @@ -29,8 +29,8 @@ struct ConfigMapGeneratorSpec { } /// Controller triggers this whenever our main object or our children changed -async fn reconcile(generator: Arc, ctx: Context) -> Result { - let client = ctx.get_ref().client.clone(); +async fn reconcile(generator: Arc, ctx: Arc) -> Result { + let client = &ctx.client; let mut contents = BTreeMap::new(); contents.insert("content".to_string(), generator.spec.content.clone()); @@ -67,7 +67,7 @@ async fn reconcile(generator: Arc, ctx: Context) -> Re } /// The controller triggers this on reconcile errors -fn error_policy(_error: &Error, _ctx: Context) -> Action { +fn error_policy(_error: &Error, _ctx: Arc) -> Action { Action::requeue(Duration::from_secs(1)) } @@ -100,7 +100,7 @@ async fn main() -> Result<()> { .owns(cms, ListParams::default()) .reconcile_all_on(reload_rx.map(|_| ())) .shutdown_on_signal() - .run(reconcile, error_policy, Context::new(Data { client })) + .run(reconcile, error_policy, Arc::new(Data { client })) .for_each(|res| async move { match res { Ok(o) => info!("reconciled {:?}", o), diff --git a/examples/kubectl.rs b/examples/kubectl.rs index 2af7a6ac1..dde6936de 100644 --- a/examples/kubectl.rs +++ b/examples/kubectl.rs @@ -138,7 +138,7 @@ impl App { info!("updating changed object {}", orig.name()); let data: DynamicObject = serde_yaml::from_str(&edited)?; // NB: simplified kubectl constructs a merge-patch of differences - api.replace(&n, &Default::default(), &data).await?; + api.replace(n, &Default::default(), &data).await?; } } else { warn!("need a name to edit"); @@ -185,7 +185,7 @@ async fn main() -> Result<()> { // Defer to methods for verbs if let Some(resource) = &app.resource { // Common discovery, parameters, and api configuration for a single resource - let (ar, caps) = resolve_api_resource(&discovery, &resource) + let (ar, caps) = resolve_api_resource(&discovery, resource) .with_context(|| format!("resource {:?} not found in cluster", resource))?; let mut lp = ListParams::default(); if let Some(label) = &app.selector { diff --git a/examples/secret_syncer.rs b/examples/secret_syncer.rs index 78255b676..b77850619 100644 --- a/examples/secret_syncer.rs +++ b/examples/secret_syncer.rs @@ -9,7 +9,7 @@ use kube::{ api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource}, error::ErrorResponse, runtime::{ - controller::{Action, Context, Controller}, + controller::{Action, Controller}, finalizer::{finalizer, Event}, }, }; @@ -101,7 +101,7 @@ async fn main() -> anyhow::Result<()> { } }, |_err, _| Action::requeue(Duration::from_secs(2)), - Context::new(()), + Arc::new(()), ) .for_each(|msg| async move { info!("Reconciled: {:?}", msg) }) .await; diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 176c68d4f..3238830b2 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -147,35 +147,6 @@ where }) } -/// A context data type that's passed through to the controllers callbacks -/// -/// `Context` gets passed to both the `reconciler` and the `error_policy` callbacks, -/// allowing a read-only view of the world without creating a big nested lambda. -/// More or less the same as Actix's [`Data`](https://docs.rs/actix-web/3.x/actix_web/web/struct.Data.html). -#[derive(Debug, Derivative)] -#[derivative(Clone(bound = ""))] -pub struct Context(Arc); - -impl Context { - /// Create new `Context` instance. - #[must_use] - pub fn new(state: T) -> Context { - Context(Arc::new(state)) - } - - /// Get reference to inner controller data. - #[must_use] - pub fn get_ref(&self) -> &T { - self.0.as_ref() - } - - /// Convert to the internal `Arc`. - #[must_use] - pub fn into_inner(self) -> Arc { - self.0 - } -} - /// A request to reconcile an object, annotated with why that request was made. /// /// NOTE: The reason is ignored for comparison purposes. This means that, for example, @@ -242,10 +213,10 @@ impl Display for ReconcileReason { /// /// This is the "hard-mode" version of [`Controller`], which allows you some more customization /// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose. -pub fn applier( - mut reconciler: impl FnMut(Arc, Context) -> ReconcilerFut, - mut error_policy: impl FnMut(&ReconcilerFut::Error, Context) -> Action, - context: Context, +pub fn applier( + mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut, + mut error_policy: impl FnMut(&ReconcilerFut::Error, Arc) -> Action, + context: Arc, store: Store, queue: QueueStream, ) -> impl Stream, Action), Error>> @@ -352,7 +323,7 @@ where /// use kube::{ /// Client, CustomResource, /// api::{Api, ListParams}, -/// runtime::controller::{Context, Controller, Action} +/// runtime::controller::{Controller, Action} /// }; /// use serde::{Deserialize, Serialize}; /// use tokio::time::Duration; @@ -373,13 +344,13 @@ where /// } /// /// /// The reconciler that will be called when either object change -/// async fn reconcile(g: Arc, _ctx: Context<()>) -> Result { +/// async fn reconcile(g: Arc, _ctx: Arc<()>) -> Result { /// // .. use api here to reconcile a child ConfigMap with ownerreferences /// // see configmapgen_controller example for full info /// Ok(Action::requeue(Duration::from_secs(300))) /// } /// /// an error handler that will be called when the reconciler fails -/// fn error_policy(_error: &Error, _ctx: Context<()>) -> Action { +/// fn error_policy(_error: &Error, _ctx: Arc<()>) -> Action { /// Action::requeue(Duration::from_secs(60)) /// } /// @@ -387,7 +358,7 @@ where /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let client = Client::try_default().await?; -/// let context = Context::new(()); // bad empty context - put client in here +/// let context = Arc::new(()); // bad empty context - put client in here /// let cmgs = Api::::all(client.clone()); /// let cms = Api::::all(client.clone()); /// Controller::new(cmgs, ListParams::default()) @@ -611,9 +582,9 @@ where /// use kube::{ /// Client, /// api::{ListParams, Api, ResourceExt}, - /// runtime::{controller::{Context, Controller, Action}}, + /// runtime::{controller::{Controller, Action}}, /// }; - /// use std::{convert::Infallible, io::BufRead}; + /// use std::{convert::Infallible, io::BufRead, sync::Arc}; /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0); /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads, /// // and its worker prevents the Tokio runtime from shutting down. @@ -633,7 +604,7 @@ where /// Ok(Action::await_change()) /// }, /// |err: &Infallible, _| Err(err).unwrap(), - /// Context::new(()), + /// Arc::new(()), /// ); /// # }; /// ``` @@ -675,8 +646,8 @@ where /// use futures::future::FutureExt; /// use k8s_openapi::api::core::v1::ConfigMap; /// use kube::{api::ListParams, Api, Client, ResourceExt}; - /// use kube_runtime::controller::{Context, Controller, Action}; - /// use std::convert::Infallible; + /// use kube_runtime::controller::{Controller, Action}; + /// use std::{convert::Infallible, sync::Arc}; /// Controller::new( /// Api::::all(Client::try_default().await.unwrap()), /// ListParams::default(), @@ -688,7 +659,7 @@ where /// Ok(Action::await_change()) /// }, /// |err: &Infallible, _| Err(err).unwrap(), - /// Context::new(()), + /// Arc::new(()), /// ); /// # }; /// ``` @@ -761,12 +732,12 @@ where /// /// This creates a stream from all builder calls and starts an applier with /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called - /// with a configurable [`Context`]. - pub fn run( + /// with a configurable `context`. + pub fn run( self, - mut reconciler: impl FnMut(Arc, Context) -> ReconcilerFut, - error_policy: impl FnMut(&ReconcilerFut::Error, Context) -> Action, - context: Context, + mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut, + error_policy: impl FnMut(&ReconcilerFut::Error, Arc) -> Action, + context: Arc, ) -> impl Stream, Action), Error>> where K::DynamicType: Debug + Unpin, @@ -792,7 +763,9 @@ where #[cfg(test)] mod tests { - use super::{Action, Context}; + use std::sync::Arc; + + use super::Action; use crate::Controller; use k8s_openapi::api::core::v1::ConfigMap; use kube_client::Api; @@ -814,7 +787,7 @@ mod tests { Controller::new(mock_type::>(), Default::default()).run( |_, _| async { Ok(mock_type::()) }, |_: &std::io::Error, _| mock_type::(), - Context::new(()), + Arc::new(()), ), ); }