Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of get_all() for async using Stream #200

Merged
merged 14 commits into from
Oct 28, 2021
Merged
53 changes: 51 additions & 2 deletions src/params.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config::{err, ok, Client, Response};
use crate::error::Error;
use crate::resources::ApiVersion;
use futures_util::stream::TryStream;
use serde::de::DeserializeOwned;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -189,8 +190,6 @@ impl<T: DeserializeOwned + Send + 'static> List<T> {
impl<T: Paginate + DeserializeOwned + Send + 'static> List<T> {
/// Repeatedly queries Stripe for more data until all elements in list are fetched, using
/// Stripe's default page size.
///
/// Requires `feature = "blocking"`.
#[cfg(feature = "blocking")]
pub fn get_all(self, client: &Client) -> Response<Vec<T>> {
let mut data = Vec::new();
Expand All @@ -208,6 +207,56 @@ impl<T: Paginate + DeserializeOwned + Send + 'static> List<T> {
Ok(data)
}

/// Get all values in this list, consuming self and paginating until all values are fetched.
///
/// This function repeatedly queries Stripe for more data until all elements in list are fetched, using
/// the page size specified in params, or Stripe's default page size if none is specified.
///
/// ```no_run
/// use futures::TryStreamExt;
///
/// let value_stream = list.get_all(&client).boxed();
/// while let Some(val) = value_stream.try_next().await? {
/// println!("GOT = {:?}", val);
/// }
///
/// // Alternatively, you can collect all values into a Vec
/// let all_values = list.get_all(&client).try_collect::<Vec<_>>().await?;
/// ```
#[cfg(not(feature = "blocking"))]
pub fn get_all(self, client: &Client) -> impl TryStream<Ok = T, Error = Error> {
// We are going to be popping items off the end of the list, so we need to reverse it.
let mut init_list = self;
init_list.data.reverse();

futures_util::stream::unfold(Some((init_list, client.clone())), |state| async move {
let (mut list, client) = state?; // if none, we sent the last item in the list last iteration
let val = list.data.pop()?; // the initial list was empty, so we're done.

if !list.data.is_empty() {
return Some((Ok(val), Some((list, client)))); // some value on this page that isn't the last value on the page
}

if !list.has_more {
return Some((Ok(val), None)); // final value of the stream, no errors
}

// We're on the last value of this page, but there's more. We need to fetch the next page.
let last_id = val.cursor();
let resp = List::get_next(&client, &list.url, last_id.as_ref(), list.params.as_deref());

match resp.await {
Ok(mut next_list) => {
next_list.data.reverse();

// Yield last value of this page, the next page (and client) becomes the state
Some((Ok(val), Some((next_list, client))))
}
Err(e) => Some((Err(e), None)), // we ran into an error. the last value of the stream will be the error.
}
})
}

/// Fetch an additional page of data from stripe.
pub fn next(&self, client: &Client) -> Response<List<T>> {
if let Some(last_id) = self.data.last().map(|d| d.cursor()) {
Expand Down