-
I don't see anything in the documentation about
Then the stream will never be able to reconnect. This results in needing to manually implement credential reloading on a stream error, which looks a bit ugly -- though it can be wrapped in a function, at least. pub fn recreate_on_err<F, U, S, T, E>(f: F) -> impl Stream<Item = Result<T, E>>
where
F: FnMut() -> U,
U: Future<Output = S>,
S: Stream<Item = Result<T, E>>,
{
futures::stream::repeat_with(f)
.then(|i| i)
.map(|s| {
let mut take = true;
s.take_while(move |i| {
ready({
let r = take;
take = take && !i.is_err();
r
})
})
})
.flatten()
} This allows one to do: recreate_on_err(|| async {
watcher(
Api::all(Client::try_default().await.unwrap()),
&watcher::Config::default()
)
}) So my questions are:
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
So, if you get an error, the |
Beta Was this translation helpful? Give feedback.
watcher
does not do any error handling like this itself, butClient
does, and awatcher
is ultimately an abstraction on theClient
.Client
has anAuthLayer
(in its tower stack) in which theRefreshableToken
will handle updating updated credentials silently behind the seams (i.e. before the actual business logic of making the watcher http call happens).So, if you get an error, the
watcher
will reset the state to default/empty (and you can log that error event), but when you poll the next watcher event it will go through the auth layer before actually making the relist call, and if there are updates needed, they should happen then.