Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples of spawning Informer tasks #201

Closed
xrl opened this issue Mar 29, 2020 · 4 comments · Fixed by #299
Closed

Examples of spawning Informer tasks #201

xrl opened this issue Mar 29, 2020 · 4 comments · Fixed by #299
Labels
runtime controller runtime related

Comments

@xrl
Copy link
Contributor

xrl commented Mar 29, 2020

I just upgraded an "old" kube-rs app to use the latest futures goodness. It was quite a learning curve to go from thread::spawn() to futures spawn. Perhaps some examples will help the next soul?

My code manages ingress objects and tls secrets to go along with them. I scan on boot, looking for expired certs. I also start an informer so new ingresses get certs ASAP.

Spawning a task for periodic full sweep of ingresses:

# buddy is a wrapper around kube client
let full_sweep_loop = interval_at(
    Instant::now().add(Duration::from_secs(60)),
    Duration::from_secs(60 * 60 * 12),
)
# must move the client wrapper in to the future so it can be yielded to each closure invocation
.zip(repeat(buddy.clone()))
.for_each(async move |(_, x)| {
    let res = x.sweep_all_ingresses().await;
    if res.is_err() {
        error!("full poll error: {:?}", res.err().unwrap());
    }
});

let full_poll_task = tokio::spawn(full_sweep_loop);

spawning a task for the informer to work:

let informer_loop =
    tokio::spawn(async move { buddy.ingress_informer_poller().await.unwrap() });

where ingress_informer_poller looks like:

    pub async fn ingress_informer_poller(&self) -> Result<(), Error> {
        let mut stream = self.kube_client.ingress_informer.poll().await?.boxed();

        while let Ok(Some(evt)) = stream.try_next().await {
            let ing = match evt {
                WatchEvent::Added(ing) => ing,
                WatchEvent::Modified(ing) => ing,
                WatchEvent::Deleted(ing) => {
                    trace!(
                        "ingress deleted: {:#?}",
                        ing.metadata.as_ref().unwrap().name
                    );
                    continue;
                }
                WatchEvent::Error(ErrorResponse {
                    status,
                    message,
                    code,
                    ..
                }) => {
                    info!("status: {}\nmessage: {}\ncode: {}", status, message, code);
                    unimplemented!("TODO");
                }
            };
            if self.kube_client.is_certbuddy_compatible(&ing) {
                self.handle_ingress(&ing).await?;
            }
        }
        Ok(())
    }

then we need to join the two tasks so they are held and continue to be driven by tokio:

let (res1, res2) = tokio::join!(full_poll_task, informer_loop);
res1.unwrap();
res2.unwrap();
@xrl xrl changed the title Examples of spawning Informer handlers Examples of spawning Informer tasks Mar 29, 2020
@nightkr
Copy link
Member

nightkr commented Mar 29, 2020

Usually you'd want to join (or try_join) the futures directly, rather than spawning them. That way, you retain error handling and the ability to cancel them.

@clux
Copy link
Member

clux commented Mar 30, 2020

Informer now returns one event for every object by default on boot (except for deleted ones), that might make it easier.

But if you actually need periodic sweeps of external resources (because the owning kube object got deleted and the controller booted), then hopefully the WIP Controller<T> from #184 will provide better examples on how to do this.

@clux
Copy link
Member

clux commented Aug 5, 2020

As this issue is quite old and examples have received quite a lot of futures based revamping (as well as controller-rs + version-rs being added to examples in readme). So feel happy closing this now after #299

But for some closure for this particular setup in the new kube-runtime context:
This particular example herein now could be done with:

  • reflector on Api<Ingress>, which then takes the reflector's reader into some periodic sweep task (very much like most reflector examples).
  • watching try_flatten_applied on the reflector stream and calling a handle_ingress_apply fn on every watcher event that produces

The certificate management example seems like a common use case. An example that does this in some basic form would be very helpful!

@clux clux added the runtime controller runtime related label Aug 5, 2020
@xrl
Copy link
Contributor Author

xrl commented Aug 5, 2020

I have forgotten what I was trying to do with this, time to close the ticket!

@xrl xrl closed this as completed Aug 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
runtime controller runtime related
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants