Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from backoff to backon #1652

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2"
async-broadcast = "0.7.0"
async-stream = "0.3.5"
async-trait = "0.1.64"
backoff = "0.4.0"
backon = "1.3.0"
base64 = "0.22.1"
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false }
Expand Down
4 changes: 1 addition & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ db-urls = ["https://github.com/rustsec/advisory-db"]
# remove them when we have to
yanked = "warn"

ignore = [
"RUSTSEC-2024-0384", # instant dep via unmaintained backoff dep
]
ignore = []

[licenses]
# See https://spdx.org/licenses/ for list of possible licenses
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] }
hyper = { workspace = true, features = ["client", "http1"] }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] }
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] }
edit = "0.1.3"
tokio-stream = { version = "0.1.9", features = ["net"] }
Expand Down
2 changes: 2 additions & 0 deletions examples/errorbounded_configmap_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use tracing::*;
#[resource(inherit = ConfigMap)]
struct CaConfigMap {
metadata: ObjectMeta,
#[allow(unused)]
data: CaConfigMapData,
}

#[derive(Deserialize, Debug, Clone)]
struct CaConfigMapData {
#[serde(rename = "ca.crt")]
#[allow(unused)]
ca_crt: String,
}

Expand Down
4 changes: 2 additions & 2 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{ops::Deref, sync::Arc, time::Duration};

use futures::{future, StreamExt};
use futures::StreamExt;
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
use kube::{
runtime::{
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn main() -> anyhow::Result<()> {
.clone()
.map(|r| Ok(r.deref().clone()))
.predicate_filter(predicates::resource_version)
.filter_map(|r| future::ready(r.ok().map(Arc::new)));
.filter_map(|r| std::future::ready(r.ok().map(Arc::new)));

// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to
// be able to consume updates, the reflector must be shared.
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct Api<K> {

/// Api constructors for Resource implementors with custom DynamicTypes
///
/// This generally means resources created via [`DynamicObject`](crate::api::DynamicObject).
/// This generally means resources created via [`DynamicObject`].
impl<K: Resource> Api<K> {
/// Cluster level resources, or resources viewed across all namespaces
///
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ fn evict_path() {

/// Marker trait for objects that can be evicted
///
/// See [`Api::evic`] for usage
/// See [`Api::evict`] for usage
pub trait Evict {}

impl Evict for k8s_openapi::api::core::v1::Pod {}
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
let io: TokioIo<TcpStream> = TokioIo::new(tcp);

tokio::spawn(async move {
let _ = http1::Builder::new()
http1::Builder::new()
.timer(TokioTimer::new())
.serve_connection(
io,
Expand Down
7 changes: 1 addition & 6 deletions kube-client/src/client/client_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,7 @@ mod test {

// Fetch using local object reference
let svc: Service = client
.fetch(
&LocalObjectReference {
name: svc.name_any().into(),
}
.within(svc.namespace()),
)
.fetch(&LocalObjectReference { name: svc.name_any() }.within(svc.namespace()))
.await?;
assert_eq!(svc.name_unchecked(), "kubernetes");

Expand Down
3 changes: 1 addition & 2 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ pub struct Client {
impl Client {
/// Create a [`Client`] using a custom `Service` stack.
///
/// [`ConfigExt`](crate::client::ConfigExt) provides extensions for
/// building a custom stack.
/// [`ConfigExt`] provides extensions for building a custom stack.
///
/// To create with the default stack with a [`Config`], use
/// [`Client::try_from`].
Expand Down
4 changes: 2 additions & 2 deletions kube-client/src/config/file_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct Cluster {
/// `disable_compression` allows client to opt-out of response compression for all requests to the server.
/// This is useful to speed up requests (specifically lists) when client-server network bandwidth is ample,
/// by saving time on compression (server-side) and decompression (client-side):
/// https://github.com/kubernetes/kubernetes/issues/112296
/// <https://github.com/kubernetes/kubernetes/issues/112296>
#[serde(rename = "disable-compression")]
#[serde(skip_serializing_if = "Option::is_none")]
pub disable_compression: Option<bool>,
Expand Down Expand Up @@ -550,7 +550,7 @@ impl AuthInfo {

/// Connection information for auth plugins that have `provideClusterInfo` enabled.
///
/// This is a copy of [`kube::config::Cluster`] with certificate_authority passed as bytes without the path.
/// This is a copy of [`Cluster`] with certificate_authority passed as bytes without the path.
/// Taken from [clientauthentication/types.go#Cluster](https://github.com/kubernetes/client-go/blob/477cb782cf024bc70b7239f0dca91e5774811950/pkg/apis/clientauthentication/types.go#L73-L129)
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl DiscoveryMode {
/// To make use of discovered apis, extract one or more [`ApiGroup`]s from it,
/// or resolve a precise one using [`Discovery::resolve_gvk`](crate::discovery::Discovery::resolve_gvk).
///
/// If caching of results is __not required__, then a simpler [`oneshot`](crate::discovery::oneshot) discovery system can be used.
/// If caching of results is __not required__, then a simpler [`oneshot`] discovery system can be used.
///
/// [`ApiGroup`]: crate::discovery::ApiGroup
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
Expand Down
6 changes: 3 additions & 3 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
//!
//! For more details, see:
//!
//! - [`Client`](crate::client) for the extensible Kubernetes client
//! - [`Config`](crate::config) for the Kubernetes config abstraction
//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources
//! - [`client`] for the extensible Kubernetes client
//! - [`config`] for the Kubernetes config abstraction
//! - [`Api`] for the generic api methods available on Kubernetes resources
//! - [k8s-openapi](https://docs.rs/k8s-openapi) for how to create typed kubernetes objects directly
#![cfg_attr(docsrs, feature(doc_cfg))]
// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
Expand Down
8 changes: 4 additions & 4 deletions kube-core/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use std::borrow::Cow;
/// Kubernetes' API [always seem to expose list structs in this manner](https://docs.rs/k8s-openapi/0.10.0/k8s_openapi/apimachinery/pkg/apis/meta/v1/struct.ObjectMeta.html?search=List).
///
/// Note that this is only used internally within reflectors and informers,
/// and is generally produced from list/watch/delete collection queries on an [`Resource`](super::Resource).
/// and is generally produced from list/watch/delete collection queries on an [`Resource`].
///
/// This is almost equivalent to [`k8s_openapi::List<T>`](k8s_openapi::List), but iterable.
/// This is almost equivalent to [`k8s_openapi::List<T>`], but iterable.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ObjectList<T>
where
Expand All @@ -27,7 +27,7 @@ where

/// ListMeta - only really used for its `resourceVersion`
///
/// See [ListMeta](k8s_openapi::apimachinery::pkg::apis::meta::v1::ListMeta)
/// See [ListMeta]
#[serde(default)]
pub metadata: ListMeta,

Expand Down Expand Up @@ -312,7 +312,7 @@ where
/// Empty struct for when data should be discarded
///
/// Not using [`()`](https://doc.rust-lang.org/stable/std/primitive.unit.html), because serde's
/// [`Deserialize`](serde::Deserialize) `impl` is too strict.
/// [`Deserialize`] `impl` is too strict.
#[derive(Clone, Deserialize, Serialize, Default, Debug)]
pub struct NotUsed {}

Expand Down
4 changes: 2 additions & 2 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ListParams {

/// Configure typed label selectors
///
/// Configure typed selectors from [`Selector`](crate::Selector) and [`Expression`](crate::Expression) lists.
/// Configure typed selectors from [`Selector`] and [`Expression`](crate::Expression) lists.
///
/// ```
/// use kube::core::{Expression, Selector, ParseExpressionError};
Expand Down Expand Up @@ -456,7 +456,7 @@ impl WatchParams {

/// Configure typed label selectors
///
/// Configure typed selectors from [`Selector`](crate::Selector) and [`Expression`](crate::Expression) lists.
/// Configure typed selectors from [`Selector`] and [`Expression`](crate::Expression) lists.
///
/// ```
/// use kube::core::{Expression, Selector, ParseExpressionError};
Expand Down
4 changes: 2 additions & 2 deletions kube-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ pub fn derive_custom_resource(input: proc_macro::TokenStream) -> proc_macro::Tok
/// ```
///
/// The example above will generate:
/// ```
/// // impl kube::Resource for FooMap { .. }
/// ```rust,ignore
/// impl kube::Resource for FooMap { .. }
/// ```
/// [`kube`]: https://docs.rs/kube
/// [`kube::Api`]: https://docs.rs/kube/*/kube/struct.Api.html
Expand Down
2 changes: 2 additions & 0 deletions kube-derive/tests/crd_enum_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]

use kube_derive::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down
1 change: 1 addition & 0 deletions kube-derive/tests/crd_schema_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(missing_docs)]
#![recursion_limit = "256"]

use assert_json_diff::assert_json_eq;
Expand Down
6 changes: 6 additions & 0 deletions kube-derive/tests/resource.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]

use k8s_openapi::{
api::core::v1::{ConfigMap, Secret},
ByteString,
Expand All @@ -9,23 +11,27 @@ use kube_derive::Resource;
#[resource(inherit = "ConfigMap")]
struct TypedMap {
metadata: ObjectMeta,
#[allow(unused)]
data: Option<TypedData>,
}

#[derive(Default)]
struct TypedData {
#[allow(unused)]
field: String,
}

#[derive(Resource, Default)]
#[resource(inherit = "Secret")]
struct TypedSecret {
metadata: ObjectMeta,
#[allow(unused)]
data: Option<TypedSecretData>,
}

#[derive(Default)]
struct TypedSecretData {
#[allow(unused)]
field: ByteString,
}

Expand Down
2 changes: 2 additions & 0 deletions kube-derive/tests/test_ui.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]

// Test that `kube-derive` outputs helpful error messages.
// If you make a change, remove `tests/ui/*.stderr` and run `cargo test`.
// Then copy the files that appear under `wip/` if it's what you expected.
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ json-patch.workspace = true
jsonptr.workspace = true
serde_json.workspace = true
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
Expand Down
23 changes: 13 additions & 10 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use crate::{
ObjectRef,
},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
utils::{
trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, ResettableBackoff,
ResettableBackoffWrapper, StreamBackoff, WatchStreamExt,
},
watcher::{self, metadata_watcher, watcher, DefaultBackoffBuilder},
};
use backoff::backoff::Backoff;
use backon::BackoffBuilder;
use educe::Educe;
use futures::{
channel,
Expand Down Expand Up @@ -629,7 +632,7 @@ where
{
// NB: Need to Unpin for stream::select_all
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
trigger_backoff: Box<dyn ResettableBackoff + Send>,
/// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
/// refusing to start any new reconciliations but letting any existing ones finish.
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
Expand Down Expand Up @@ -689,7 +692,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
trigger_backoff: Box::<ResettableBackoffWrapper<DefaultBackoffBuilder>>::default(),
clux marked this conversation as resolved.
Show resolved Hide resolved
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand Down Expand Up @@ -775,7 +778,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
trigger_backoff: Box::<ResettableBackoffWrapper<DefaultBackoffBuilder>>::default(),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand Down Expand Up @@ -886,7 +889,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
trigger_backoff: Box::<ResettableBackoffWrapper<DefaultBackoffBuilder>>::default(),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand All @@ -912,11 +915,11 @@ where
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
///
/// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
/// The [default backoff policy](crate::WatchStreamExt::default_backoff) follows client-go conventions,
/// but can be overridden by calling this method.
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
pub fn trigger_backoff(mut self, backoff_builder: impl BackoffBuilder + Clone + 'static) -> Self {
self.trigger_backoff = Box::new(ResettableBackoffWrapper::new(backoff_builder));
self
}

Expand Down
10 changes: 5 additions & 5 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use async_broadcast::{InactiveReceiver, Receiver, Sender};

use super::Lookup;

#[cfg(doc)] pub use super::store::Writer;

#[derive(Educe)]
#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
// A helper type that holds a broadcast transmitter and a broadcast receiver,
Expand Down Expand Up @@ -74,18 +76,16 @@ where

/// A handle to a shared stream reader
///
/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`],
/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each
/// shared stream reader should be polled independently and driven to readiness
/// [`ReflectHandle`]s are created by calling [`Writer::subscribe`],
/// or by calling [`clone`](Clone::clone) on an already existing [`ReflectHandle`].
/// Each shared stream reader should be polled independently and driven to readiness
/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure
/// will be applied on the root stream side.
///
/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s
/// subscribed to the stream will also terminate after all events yielded by
/// the root stream have been observed. This means [`ReflectHandle`] streams
/// can still be polled after the root stream has been dropped.
///
/// [`Writer`]: crate::reflector::Writer
#[pin_project]
pub struct ReflectHandle<K>
where
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait Lookup {
/// The [version](Resource::version) for this object.
fn version(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [apiVersion](Resource::_version) for this object.
/// The [apiVersion](Resource::api_version) for this object.
fn api_version(dyntype: &Self::DynamicType) -> Cow<'_, str> {
api_version_from_group_version(Self::group(dyntype), Self::version(dyntype))
}
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Delays and deduplicates [`Stream`](futures::stream::Stream) items
//! Delays and deduplicates [`Stream`] items

use futures::{stream::Fuse, Stream, StreamExt};
use hashbrown::{hash_map::RawEntryMut, HashMap};
Expand Down
Loading
Loading