-
Hey :) use futures::TryStreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
runtime::{
reflector::{self, Store},
watcher,
},
Api, Client,
};
use tracing::info;
pub async fn get_cm_reader() -> Result<Store<ConfigMap>, Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let api = Api::<ConfigMap>::default_namespaced(client);
let (reader, writer) = reflector::store();
let rf = reflector::reflector(writer, watcher(api, watcher::Config::default()));
while let Some(cm_event) = rf.try_next().await? {
info!("Received ConfigMap change: {:?}", cm_event);
}
Ok(reader)
} leads to:
Why doesn't the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Basically, it's because of To satisfy |
Beta Was this translation helpful? Give feedback.
Basically, it's because of
Unpin
requirements on the stream. To simplify it a bit, the futures used by the various stream implementations has a degree of self-referentiality and cannot be moved in async contexts / multi-threaded executors. This blog post is perhaps useful for the understanding.To satisfy
Unpin
you can pin it to the stack with the stdlib'sstd::pin::pin
macro, ala node_watcher, or, more safely (because this can consume a lot of stack size), you can put it on the heap viaBox::pin
, commonly invoked viaStreamExt::boxed
as seen in the crd_reflector ex.