From dc1145039a17ddba7ab8eb8bca55ffc498867621 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Wed, 23 Aug 2023 09:05:07 -0400 Subject: [PATCH 1/2] first attempt integrate schedule, retrying, and error states for use_query --- src/lib.rs | 17 +- src/query.rs | 21 +- src/query_client.rs | 873 +++++++++++++++++++++--------------------- src/query_executor.rs | 99 +++-- src/query_options.rs | 14 +- src/query_result.rs | 15 +- src/query_state.rs | 53 ++- src/schedule.rs | 416 ++++++++++++++++++++ src/use_query.rs | 90 ++++- 9 files changed, 1083 insertions(+), 515 deletions(-) create mode 100644 src/schedule.rs diff --git a/src/lib.rs b/src/lib.rs index d98ab9c..c2ca8ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -131,14 +131,15 @@ mod query_executor; mod query_options; mod query_result; mod query_state; +mod schedule; mod use_query; mod util; -pub use instant::*; -use query::*; -pub use query_client::*; -pub use query_executor::*; -pub use query_options::*; -pub use query_result::*; -pub use query_state::*; -pub use use_query::*; +pub use crate::instant::*; +use crate::query::*; +pub use crate::query_client::*; +pub use crate::query_executor::*; +pub use crate::query_options::*; +pub use crate::query_result::*; +pub use crate::query_state::*; +pub use crate::use_query::*; diff --git a/src/query.rs b/src/query.rs index 4e99337..55e2af3 100644 --- a/src/query.rs +++ b/src/query.rs @@ -4,33 +4,35 @@ use std::{cell::Cell, rc::Rc, time::Duration}; use crate::{ensure_valid_stale_time, QueryOptions, QueryState}; #[derive(Clone)] -pub(crate) struct Query +pub(crate) struct Query where K: 'static, V: 'static, + E: 'static, { pub(crate) key: K, // State. pub(crate) observers: Rc>, - pub(crate) state: RwSignal>, + pub(crate) state: RwSignal>, // Config. pub(crate) stale_time: RwSignal>, pub(crate) cache_time: RwSignal>, pub(crate) refetch_interval: RwSignal>, } -impl PartialEq for Query { +impl PartialEq for Query { fn eq(&self, other: &Self) -> bool { self.key == other.key } } -impl Eq for Query {} +impl Eq for Query {} -impl Query +impl Query where K: Clone + 'static, V: Clone + 'static, + E: Clone + 'static, { pub(crate) fn new(cx: Scope, key: K) -> Self { let stale_time = create_rw_signal(cx, None); @@ -50,10 +52,11 @@ where } } -impl Query +impl Query where K: Clone + 'static, V: Clone + 'static, + E: Clone + 'static, { /// Marks the resource as invalid, which will cause it to be refetched on next read. pub(crate) fn mark_invalid(&self) -> bool { @@ -65,7 +68,7 @@ where } } - pub(crate) fn overwrite_options(&self, options: QueryOptions) { + pub(crate) fn overwrite_options(&self, options: QueryOptions) { let stale_time = ensure_valid_stale_time(&options.stale_time, &options.cache_time); self.stale_time.set(stale_time); @@ -77,7 +80,7 @@ where // The lowest stale time & refetch interval will be used. // When the scope is dropped, the stale time & refetch interval will be reset to the previous value (if they existed). // Cache time behaves differently. It will only use the minimum cache time found. - pub(crate) fn update_options(&self, cx: Scope, options: QueryOptions) { + pub(crate) fn update_options(&self, cx: Scope, options: QueryOptions) { // Use the minimum cache time. match (self.cache_time.get_untracked(), options.cache_time) { (Some(current), Some(new)) if new < current => self.cache_time.set(Some(new)), @@ -123,7 +126,7 @@ where } } -impl Query { +impl Query { pub(crate) fn dispose(&self) { self.state.dispose(); self.stale_time.dispose(); diff --git a/src/query_client.rs b/src/query_client.rs index 62336c5..0d237d9 100644 --- a/src/query_client.rs +++ b/src/query_client.rs @@ -42,10 +42,10 @@ pub struct QueryClient { pub(crate) cx: Scope, // Signal to indicate a cache entry has been added or removed. pub(crate) notify: RwSignal<()>, - pub(crate) cache: Rc>>>, + pub(crate) cache: Rc>>>, } -pub(crate) struct CacheEntry(HashMap>); +pub(crate) struct CacheEntry(HashMap>); // Trait to enable cache introspection among distinct cache entry maps. pub(crate) trait CacheEntryTrait: CacheSize + CacheInvalidate { @@ -53,10 +53,11 @@ pub(crate) trait CacheEntryTrait: CacheSize + CacheInvalidate { fn as_any_mut(&mut self) -> &mut dyn Any; } -impl CacheEntryTrait for CacheEntry +impl CacheEntryTrait for CacheEntry where K: Clone, V: Clone, + E: Clone, { fn as_any(&self) -> &dyn Any { self @@ -71,7 +72,7 @@ pub(crate) trait CacheSize { fn size(&self) -> usize; } -impl CacheSize for CacheEntry { +impl CacheSize for CacheEntry { fn size(&self) -> usize { self.0.len() } @@ -81,10 +82,11 @@ pub(crate) trait CacheInvalidate { fn invalidate(&self); } -impl CacheInvalidate for CacheEntry +impl CacheInvalidate for CacheEntry where K: Clone, V: Clone, + E: Clone, { fn invalidate(&self) { for (_, query) in self.0.iter() { @@ -107,105 +109,105 @@ impl QueryClient { /// If the entry already exists it will still be refetched. /// /// If you don't need the result opt for [`prefetch_query()`](Self::prefetch_query) - pub fn fetch_query( - &self, - cx: Scope, - key: impl Fn() -> K + 'static, - fetcher: impl Fn(K) -> Fu + 'static, - isomorphic: bool, - ) -> QueryResult - where - K: Hash + Eq + Clone + 'static, - V: Clone + 'static, - Fu: Future + 'static, - { - let state = self.get_query_signal(cx, key); - - let state = Signal::derive(cx, move || state.get().0); - - let executor = create_executor(state, fetcher); - - let sync = { - let executor = executor.clone(); - move |_| { - let _ = state.get(); - executor() - } - }; - if isomorphic { - create_isomorphic_effect(cx, sync); - } else { - create_effect(cx, sync); - } - - synchronize_state(cx, state, executor.clone()); - - create_query_result( - cx, - state, - Signal::derive(self.cx, move || state.get().state.get().data().cloned()), - executor, - ) - } + // pub fn fetch_query( + // &self, + // cx: Scope, + // key: impl Fn() -> K + 'static, + // fetcher: impl Fn(K) -> Fu + 'static, + // isomorphic: bool, + // ) -> QueryResult + // where + // K: Hash + Eq + Clone + 'static, + // V: Clone + 'static, + // Fu: Future + 'static, + // { + // let state = self.get_query_signal(cx, key); + + // let state = Signal::derive(cx, move || state.get().0); + + // let executor = create_executor(state, fetcher); + + // let sync = { + // let executor = executor.clone(); + // move |_| { + // let _ = state.get(); + // executor() + // } + // }; + // if isomorphic { + // create_isomorphic_effect(cx, sync); + // } else { + // create_effect(cx, sync); + // } + + // synchronize_state(cx, state, executor.clone()); + + // create_query_result( + // cx, + // state, + // Signal::derive(self.cx, move || state.get().state.get().data().cloned()), + // executor, + // ) + // } /// Prefetch a query and store it in cache. /// If the entry already exists it will still be refetched. /// /// If you need the result opt for [`fetch_query()`](Self::fetch_query) - pub fn prefetch_query( - &self, - cx: Scope, - key: impl Fn() -> K + 'static, - query: impl Fn(K) -> Fu + 'static, - isomorphic: bool, - ) where - K: Hash + Eq + Clone + 'static, - V: Clone + 'static, - Fu: Future + 'static, - { - let state = self.get_query_signal(cx, key); - - let state = Signal::derive(cx, move || state.get().0); - - let executor = create_executor(state, query); - - let sync = { - move |_| { - let _ = state.get(); - executor() - } - }; - if isomorphic { - create_isomorphic_effect(cx, sync); - } else { - create_effect(cx, sync); - } - } + // pub fn prefetch_query( + // &self, + // cx: Scope, + // key: impl Fn() -> K + 'static, + // query: impl Fn(K) -> Fu + 'static, + // isomorphic: bool, + // ) where + // K: Hash + Eq + Clone + 'static, + // V: Clone + 'static, + // Fu: Future + 'static, + // { + // let state = self.get_query_signal(cx, key); + + // let state = Signal::derive(cx, move || state.get().0); + + // let executor = create_executor(state, query); + + // let sync = { + // move |_| { + // let _ = state.get(); + // executor() + // } + // }; + // if isomorphic { + // create_isomorphic_effect(cx, sync); + // } else { + // create_effect(cx, sync); + // } + // } /// Retrieve the current state for an existing query. /// If the query does not exist, [`None`](Option::None) will be returned. - pub fn get_query_state( - self, - cx: Scope, - key: impl Fn() -> K + 'static, - ) -> Signal>> - where - K: Hash + Eq + Clone + 'static, - V: Clone, - { - let client = self.clone(); - - // Memoize state to avoid unnecessary hashmap lookups. - let maybe_query = create_memo(cx, move |_| { - let key = key(); - client.notify.get(); - client.use_cache_option(|cache: &HashMap>| cache.get(&key).cloned()) - }); - - synchronize_observer(cx, maybe_query.into()); - - Signal::derive(cx, move || maybe_query.get().map(|s| s.state.get())) - } + // pub fn get_query_state( + // self, + // cx: Scope, + // key: impl Fn() -> K + 'static, + // ) -> Signal>> + // where + // K: Hash + Eq + Clone + 'static, + // V: Clone, + // { + // let client = self.clone(); + + // // Memoize state to avoid unnecessary hashmap lookups. + // let maybe_query = create_memo(cx, move |_| { + // let key = key(); + // client.notify.get(); + // client.use_cache_option(|cache: &HashMap>| cache.get(&key).cloned()) + // }); + + // synchronize_observer(cx, maybe_query.into()); + + // Signal::derive(cx, move || maybe_query.get().map(|s| s.state.get())) + // } /// Attempts to invalidate an entry in the Query Cache. /// Matching query is marked as invalid, and will be refetched in background once it's active. @@ -217,18 +219,18 @@ impl QueryClient { /// let client = use_query_client(cx); /// let invalidated = client.invalidate_query::(0); /// ``` - pub fn invalidate_query(&self, key: impl Borrow) -> bool - where - K: Hash + Eq + Clone + 'static, - V: Clone + 'static, - { - self.use_cache_option(|cache: &HashMap>| { - cache - .get(Borrow::borrow(&key)) - .map(|state| state.mark_invalid()) - }) - .unwrap_or(false) - } + // pub fn invalidate_query(&self, key: impl Borrow) -> bool + // where + // K: Hash + Eq + Clone + 'static, + // V: Clone + 'static, + // { + // self.use_cache_option(|cache: &HashMap>| { + // cache + // .get(Borrow::borrow(&key)) + // .map(|state| state.mark_invalid()) + // }) + // .unwrap_or(false) + // } /// Attempts to invalidate multiple entries in the Query Cache with a common type. /// All matching queries are immediately marked as invalid and active queries are refetched in the background. @@ -242,30 +244,30 @@ impl QueryClient { /// let invalidated = client.invalidate_queries::(keys) /// /// ``` - pub fn invalidate_queries(&self, keys: impl IntoIterator) -> Option> - where - K: Hash + Eq + Clone + 'static, - V: Clone + 'static, - Q: Borrow, - { - // Find all states, drop borrow, then mark invalid. - let cache_borrowed = RefCell::borrow(&self.cache); - let type_key = (TypeId::of::(), TypeId::of::()); - let cache = cache_borrowed.get(&type_key)?; - let cache = cache.as_any().downcast_ref::>()?; - let result = keys - .into_iter() - .filter(|key| { - cache - .0 - .get(Borrow::borrow(key)) - .map(|query| query.mark_invalid()) - .unwrap_or(false) - }) - .collect::>(); - - Some(result) - } + // pub fn invalidate_queries(&self, keys: impl IntoIterator) -> Option> + // where + // K: Hash + Eq + Clone + 'static, + // V: Clone + 'static, + // Q: Borrow, + // { + // // Find all states, drop borrow, then mark invalid. + // let cache_borrowed = RefCell::borrow(&self.cache); + // let type_key = (TypeId::of::(), TypeId::of::()); + // let cache = cache_borrowed.get(&type_key)?; + // let cache = cache.as_any().downcast_ref::>()?; + // let result = keys + // .into_iter() + // .filter(|key| { + // cache + // .0 + // .get(Borrow::borrow(key)) + // .map(|query| query.mark_invalid()) + // .unwrap_or(false) + // }) + // .collect::>(); + + // Some(result) + // } /// Invalidate all queries with a common type. /// @@ -278,20 +280,20 @@ impl QueryClient { /// client.invalidate_query_type::(); /// /// ``` - pub fn invalidate_query_type(&self) -> &Self - where - K: Clone + 'static, - V: Clone + 'static, - { - self.use_cache_option(|cache: &HashMap>| { - for q in cache.values() { - q.mark_invalid(); - } - Some(()) - }); - - self - } + // pub fn invalidate_query_type(&self) -> &Self + // where + // K: Clone + 'static, + // V: Clone + 'static, + // { + // self.use_cache_option(|cache: &HashMap>| { + // for q in cache.values() { + // q.mark_invalid(); + // } + // Some(()) + // }); + + // self + // } /// Invalidates all queries in the cache. /// @@ -363,110 +365,113 @@ impl QueryClient { /// new_monkey /// }); /// ``` - pub fn set_query_data( - &self, - key: K, - updater: impl FnOnce(Option<&V>) -> Option + 'static, - ) -> &Self - where - K: Clone + Eq + Hash + 'static, - V: Clone + 'static, - { - enum SetResult { - Inserted, - Updated, - Nothing, - } - let result = self.use_cache( - move |(root_scope, cache): (Scope, &mut HashMap>)| match cache - .entry(key.clone()) - { - Entry::Occupied(entry) => { - let query = entry.get(); - let result = query.state.with_untracked(|s| { - let data = s.query_data().map(|d| &d.data); - updater(data) - }); - // Only update query data if updater returns Some. - if let Some(result) = result { - query.state.set(QueryState::Loaded(QueryData::now(result))); - SetResult::Updated - } else { - SetResult::Nothing - } - } - Entry::Vacant(entry) => { - // Only insert query if updater returns Some. - if let Some(result) = updater(None) { - let query = Query::new(root_scope, key); - query.state.set(QueryState::Loaded(QueryData::now(result))); - entry.insert(query); - SetResult::Inserted - } else { - SetResult::Nothing - } - } - }, - ); - - if let SetResult::Inserted = result { - self.notify.set(()); - } - - self - } - - fn use_cache_option(&self, func: F) -> Option + // pub fn set_query_data( + // &self, + // key: K, + // updater: impl FnOnce(Option<&V>) -> Option + 'static, + // ) -> &Self + // where + // K: Clone + Eq + Hash + 'static, + // V: Clone + 'static, + // { + // enum SetResult { + // Inserted, + // Updated, + // Nothing, + // } + // let result = self.use_cache( + // move |(root_scope, cache): (Scope, &mut HashMap>)| match cache + // .entry(key.clone()) + // { + // Entry::Occupied(entry) => { + // let query = entry.get(); + // let result = query.state.with_untracked(|s| { + // let data = s.query_data().map(|d| &d.data); + // updater(data) + // }); + // // Only update query data if updater returns Some. + // if let Some(result) = result { + // query.state.set(QueryState::Loaded(QueryData::now(result))); + // SetResult::Updated + // } else { + // SetResult::Nothing + // } + // } + // Entry::Vacant(entry) => { + // // Only insert query if updater returns Some. + // if let Some(result) = updater(None) { + // let query = Query::new(root_scope, key); + // query.state.set(QueryState::Loaded(QueryData::now(result))); + // entry.insert(query); + // SetResult::Inserted + // } else { + // SetResult::Nothing + // } + // } + // }, + // ); + + // if let SetResult::Inserted = result { + // self.notify.set(()); + // } + + // self + // } + + fn use_cache_option(&self, func: F) -> Option where K: 'static, V: 'static, - F: FnOnce(&HashMap>) -> Option, + E: 'static, + F: FnOnce(&HashMap>) -> Option, R: 'static, { let cache = RefCell::borrow(&self.cache); - let type_key = (TypeId::of::(), TypeId::of::()); + let type_key = (TypeId::of::(), TypeId::of::(), TypeId::of::()); let cache = cache.get(&type_key)?; - let cache = cache.as_any().downcast_ref::>()?; + let cache = cache.as_any().downcast_ref::>()?; func(&cache.0) } - fn use_cache_option_mut(&self, func: F) -> Option + fn use_cache_option_mut(&self, func: F) -> Option where K: 'static, V: 'static, - F: FnOnce(&mut HashMap>) -> Option, + E: 'static, + F: FnOnce(&mut HashMap>) -> Option, R: 'static, { let mut cache = self.cache.borrow_mut(); - let type_key = (TypeId::of::(), TypeId::of::()); + let type_key = (TypeId::of::(), TypeId::of::(), TypeId::of::()); let cache = cache.get_mut(&type_key)?; - let cache = cache.as_any_mut().downcast_mut::>()?; + let cache = cache.as_any_mut().downcast_mut::>()?; func(&mut cache.0) } - fn use_cache( + fn use_cache( &self, - func: impl FnOnce((Scope, &mut HashMap>)) -> R + 'static, + func: impl FnOnce((Scope, &mut HashMap>)) -> R + 'static, ) -> R where K: Clone + 'static, V: Clone + 'static, + E: Clone + 'static, { let mut cache = self.cache.borrow_mut(); - let type_key = (TypeId::of::(), TypeId::of::()); + let type_key = (TypeId::of::(), TypeId::of::(), TypeId::of::()); let cache: &mut Box = match cache.entry(type_key) { Entry::Occupied(o) => o.into_mut(), Entry::Vacant(v) => { - let wrapped: CacheEntry = CacheEntry(HashMap::new()); + let wrapped: CacheEntry = CacheEntry(HashMap::new()); v.insert(Box::new(wrapped)) } }; - let cache: &mut CacheEntry = cache + let cache: &mut CacheEntry = cache .as_any_mut() - .downcast_mut::>() + .downcast_mut::>() .expect( "Error: Query Cache Type Mismatch. This should not happen. Please file a bug report.", ); @@ -474,10 +479,11 @@ impl QueryClient { func((self.cx, &mut cache.0)) } - fn get_or_create_query(&self, key: K) -> (Query, bool) + fn get_or_create_query(&self, key: K) -> (Query, bool) where K: Clone + Eq + Hash + 'static, V: Clone + 'static, + E: Clone + 'static, { let result = self.use_cache(move |(root_scope, cache)| { let entry = cache.entry(key.clone()); @@ -503,14 +509,15 @@ impl QueryClient { result } - pub(crate) fn get_query_signal( + pub(crate) fn get_query_signal( &self, cx: Scope, key: impl Fn() -> K + 'static, - ) -> Signal<(Query, bool)> + ) -> Signal<(Query, bool)> where K: Hash + Eq + Clone + 'static, V: Clone + 'static, + E: Clone + 'static, { let client = self.clone(); @@ -522,7 +529,7 @@ impl QueryClient { .into() } - pub(crate) fn evict_and_notify(&self, key: &K) -> Option> + pub(crate) fn evict_and_notify(&self, key: &K) -> Option> where K: Hash + Eq + 'static, V: 'static, @@ -536,234 +543,234 @@ impl QueryClient { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn prefetch_loads_data() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - assert_eq!(0, client.clone().size().get_untracked()); - - let state = client.clone().get_query_state::(cx, || 0); - - assert_eq!(None, state.get_untracked()); - - client.clone().prefetch_query( - cx, - || 0, - |num: u32| async move { num.to_string() }, - true, - ); - - assert_eq!( - Some("0".to_string()), - state.get_untracked().and_then(|q| q.data().cloned()) - ); - - assert!(matches!( - state.get_untracked(), - Some(QueryState::Loaded { .. }) - )); +// #[cfg(test)] +// mod tests { +// use super::*; - assert_eq!(1, client.clone().size().get_untracked()); +// #[test] +// fn prefetch_loads_data() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); - client.clone().invalidate_query::(0); +// assert_eq!(0, client.clone().size().get_untracked()); - assert!(matches!( - state.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - }); - } - - #[test] - fn set_query_data() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - let state = client.clone().get_query_state::(cx, || 0); - assert_eq!(None, state.get_untracked()); - assert_eq!(0, client.clone().size().get_untracked()); +// let state = client.clone().get_query_state::(cx, || 0); - client.clone().set_query_data::(0, |_| None); +// assert_eq!(None, state.get_untracked()); - assert_eq!(None, state.get_untracked()); - assert_eq!(0, client.size().get_untracked()); +// client.clone().prefetch_query( +// cx, +// || 0, +// |num: u32| async move { num.to_string() }, +// true, +// ); - client - .clone() - .set_query_data::(0, |_| Some("0".to_string())); +// assert_eq!( +// Some("0".to_string()), +// state.get_untracked().and_then(|q| q.data().cloned()) +// ); - assert_eq!(1, client.clone().size().get_untracked()); +// assert!(matches!( +// state.get_untracked(), +// Some(QueryState::Loaded { .. }) +// )); - assert_eq!( - Some("0".to_string()), - state.get_untracked().and_then(|q| q.data().cloned()) - ); +// assert_eq!(1, client.clone().size().get_untracked()); - assert!(matches!( - state.get_untracked(), - Some(QueryState::Loaded { .. }) - )); +// client.clone().invalidate_query::(0); - client.set_query_data::(0, |_| Some("1".to_string())); +// assert!(matches!( +// state.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// }); +// } + +// #[test] +// fn set_query_data() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); - assert_eq!( - Some("1".to_string()), - state.get_untracked().and_then(|q| q.data().cloned()) - ); - }); - } +// let state = client.clone().get_query_state::(cx, || 0); +// assert_eq!(None, state.get_untracked()); +// assert_eq!(0, client.clone().size().get_untracked()); - #[test] - fn can_use_same_key_with_different_value_types() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); +// client.clone().set_query_data::(0, |_| None); - client.set_query_data::(0, |_| Some("0".to_string())); +// assert_eq!(None, state.get_untracked()); +// assert_eq!(0, client.size().get_untracked()); + +// client +// .clone() +// .set_query_data::(0, |_| Some("0".to_string())); + +// assert_eq!(1, client.clone().size().get_untracked()); + +// assert_eq!( +// Some("0".to_string()), +// state.get_untracked().and_then(|q| q.data().cloned()) +// ); - client.set_query_data::(0, |_| Some(1234)); +// assert!(matches!( +// state.get_untracked(), +// Some(QueryState::Loaded { .. }) +// )); + +// client.set_query_data::(0, |_| Some("1".to_string())); + +// assert_eq!( +// Some("1".to_string()), +// state.get_untracked().and_then(|q| q.data().cloned()) +// ); +// }); +// } + +// #[test] +// fn can_use_same_key_with_different_value_types() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); - assert_eq!(2, client.size().get_untracked()); - }); - } - - #[test] - fn can_invalidate_while_subscribed() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - let subscription = client.clone().get_query_state::(cx, || 0_u32); - - create_isomorphic_effect(cx, move |_| { - subscription.get(); - }); - - client.set_query_data::(0_u32, |_| Some(1234)); - - assert!(client.invalidate_query::(0)); - let state = subscription.get_untracked(); - - assert!( - matches!(state, Some(QueryState::Invalid { .. })), - "Query should be invalid" - ); - }); - } - - #[test] - fn can_invalidate_multiple() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - client.set_query_data::(0, |_| Some(1234)); - client.set_query_data::(1, |_| Some(1234)); - let keys: Vec = vec![0, 1]; - let invalidated = client - .invalidate_queries::(keys.clone()) - .unwrap_or_default(); - - assert_eq!(keys, invalidated) - }); - } - - #[test] - fn can_invalidate_multiple_strings() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - let zero = "0".to_string(); - let one = "1".to_string(); - - client.set_query_data::(zero.clone(), |_| Some("1234".into())); - client.set_query_data::(one.clone(), |_| Some("5678".into())); - - let keys = vec![zero, one]; - let invalidated = client - .invalidate_queries::(keys.clone()) - .unwrap_or_default(); - - assert_eq!(keys, invalidated) - }); - } - - #[test] - fn invalidate_all() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - let zero = "0".to_string(); - let one = "1".to_string(); - - client.set_query_data::(zero.clone(), |_| Some("1234".into())); - client.set_query_data::(one.clone(), |_| Some("5678".into())); - client.set_query_data::(0, |_| Some(1234)); - client.set_query_data::(1, |_| Some(5678)); - - let state0_string = client - .clone() - .get_query_state::(cx, move || zero.clone()); - - let state1_string = client - .clone() - .get_query_state::(cx, move || one.clone()); - - let state0 = client.clone().get_query_state::(cx, || 0); - let state1 = client.clone().get_query_state::(cx, || 1); - - client.invalidate_all_queries(); - - assert!(matches!( - state0.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - assert!(matches!( - state1.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - assert!(matches!( - state0_string.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - assert!(matches!( - state1_string.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - }); - } - - #[test] - fn can_invalidate_subset() { - run_scope(create_runtime(), |cx| { - provide_query_client(cx); - let client = use_query_client(cx); - - client.set_query_data::(0, |_| Some(1234)); - client.set_query_data::(1, |_| Some(1234)); - - let state0 = client.clone().get_query_state::(cx, || 0); - let state1 = client.clone().get_query_state::(cx, || 1); - - client.invalidate_query_type::(); - - assert!(matches!( - state0.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - assert!(matches!( - state1.get_untracked(), - Some(QueryState::Invalid { .. }) - )); - }); - } -} +// client.set_query_data::(0, |_| Some("0".to_string())); + +// client.set_query_data::(0, |_| Some(1234)); + +// assert_eq!(2, client.size().get_untracked()); +// }); +// } + +// #[test] +// fn can_invalidate_while_subscribed() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); + +// let subscription = client.clone().get_query_state::(cx, || 0_u32); + +// create_isomorphic_effect(cx, move |_| { +// subscription.get(); +// }); + +// client.set_query_data::(0_u32, |_| Some(1234)); + +// assert!(client.invalidate_query::(0)); +// let state = subscription.get_untracked(); + +// assert!( +// matches!(state, Some(QueryState::Invalid { .. })), +// "Query should be invalid" +// ); +// }); +// } + +// #[test] +// fn can_invalidate_multiple() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); + +// client.set_query_data::(0, |_| Some(1234)); +// client.set_query_data::(1, |_| Some(1234)); +// let keys: Vec = vec![0, 1]; +// let invalidated = client +// .invalidate_queries::(keys.clone()) +// .unwrap_or_default(); + +// assert_eq!(keys, invalidated) +// }); +// } + +// #[test] +// fn can_invalidate_multiple_strings() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); + +// let zero = "0".to_string(); +// let one = "1".to_string(); + +// client.set_query_data::(zero.clone(), |_| Some("1234".into())); +// client.set_query_data::(one.clone(), |_| Some("5678".into())); + +// let keys = vec![zero, one]; +// let invalidated = client +// .invalidate_queries::(keys.clone()) +// .unwrap_or_default(); + +// assert_eq!(keys, invalidated) +// }); +// } + +// #[test] +// fn invalidate_all() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); + +// let zero = "0".to_string(); +// let one = "1".to_string(); + +// client.set_query_data::(zero.clone(), |_| Some("1234".into())); +// client.set_query_data::(one.clone(), |_| Some("5678".into())); +// client.set_query_data::(0, |_| Some(1234)); +// client.set_query_data::(1, |_| Some(5678)); + +// let state0_string = client +// .clone() +// .get_query_state::(cx, move || zero.clone()); + +// let state1_string = client +// .clone() +// .get_query_state::(cx, move || one.clone()); + +// let state0 = client.clone().get_query_state::(cx, || 0); +// let state1 = client.clone().get_query_state::(cx, || 1); + +// client.invalidate_all_queries(); + +// assert!(matches!( +// state0.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// assert!(matches!( +// state1.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// assert!(matches!( +// state0_string.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// assert!(matches!( +// state1_string.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// }); +// } + +// #[test] +// fn can_invalidate_subset() { +// run_scope(create_runtime(), |cx| { +// provide_query_client(cx); +// let client = use_query_client(cx); + +// client.set_query_data::(0, |_| Some(1234)); +// client.set_query_data::(1, |_| Some(1234)); + +// let state0 = client.clone().get_query_state::(cx, || 0); +// let state1 = client.clone().get_query_state::(cx, || 1); + +// client.invalidate_query_type::(); + +// assert!(matches!( +// state0.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// assert!(matches!( +// state1.get_untracked(), +// Some(QueryState::Invalid { .. }) +// )); +// }); +// } +// } diff --git a/src/query_executor.rs b/src/query_executor.rs index f5716d8..693ee63 100644 --- a/src/query_executor.rs +++ b/src/query_executor.rs @@ -11,7 +11,7 @@ use crate::{ query::Query, use_query_client, util::{maybe_time_until_stale, time_until_stale, use_timeout}, - QueryData, QueryState, + QueryData, QueryError, QueryState, }; thread_local! { @@ -24,14 +24,15 @@ pub fn suppress_query_load(suppress: bool) { } // Create Executor function which will execute task in `spawn_local` and update state. -pub(crate) fn create_executor( - query: Signal>, +pub(crate) fn create_executor( + query: Signal>, fetcher: impl Fn(K) -> Fu + 'static, ) -> impl Fn() + Clone where K: Clone + Hash + Eq + 'static, V: Clone + 'static, - Fu: Future + 'static, + E: Clone + 'static, + Fu: Future> + 'static, { let fetcher = Rc::new(fetcher); move || { @@ -42,22 +43,63 @@ where let query = query.get_untracked(); let data_state = query.state.get_untracked(); match data_state { - QueryState::Fetching(_) | QueryState::Loading => (), + QueryState::Fetching(_) | QueryState::Loading | QueryState::Retrying(_) => { + () + } // First load. QueryState::Created => { query.state.set(QueryState::Loading); - let data = fetcher(query.key.clone()).await; + let result = fetcher(query.key.clone()).await; let updated_at = crate::Instant::now(); - let data = QueryData { data, updated_at }; - query.state.set(QueryState::Loaded(data)); + match result { + Ok(data) => { + let data = QueryData { data, updated_at }; + query.state.set(QueryState::Loaded(data)); + } + Err(error) => query.state.set(QueryState::Error(QueryError { + error, + error_count: 0, + updated_at, + prev_data: None, + })), + } } // Subsequent loads. QueryState::Loaded(data) | QueryState::Invalid(data) => { - query.state.set(QueryState::Fetching(data)); - let data = fetcher(query.key.clone()).await; + query.state.set(QueryState::Fetching(data.clone())); + let result = fetcher(query.key.clone()).await; + let updated_at = crate::Instant::now(); + match result { + Ok(data) => { + let data = QueryData { data, updated_at }; + query.state.set(QueryState::Loaded(data)); + } + Err(error) => query.state.set(QueryState::Error(QueryError { + error, + error_count: 0, + updated_at, + prev_data: Some(data), + })), + } + } + QueryState::Error(prev_error) => { + query + .state + .set(QueryState::Retrying(prev_error.prev_data.clone())); + let result = fetcher(query.key.clone()).await; let updated_at = crate::Instant::now(); - let data = QueryData { data, updated_at }; - query.state.set(QueryState::Loaded(data)); + match result { + Ok(data) => { + let data = QueryData { data, updated_at }; + query.state.set(QueryState::Loaded(data)); + } + Err(error) => query.state.set(QueryState::Error(QueryError { + error, + error_count: prev_error.error_count, + updated_at, + prev_data: prev_error.prev_data, + })), + } } } }) @@ -67,13 +109,14 @@ where } // Start synchronization effects. -pub(crate) fn synchronize_state( +pub(crate) fn synchronize_state( cx: Scope, - query: Signal>, + query: Signal>, executor: impl Fn() + Clone + 'static, ) where K: Hash + Eq + Clone + 'static, V: Clone, + E: Clone, { ensure_not_stale(cx, query, executor.clone()); ensure_not_invalid(cx, query, executor.clone()); @@ -83,19 +126,20 @@ pub(crate) fn synchronize_state( synchronize_observer(cx, query); } -pub(crate) fn synchronize_observer(cx: Scope, query: Signal>>) +pub(crate) fn synchronize_observer(cx: Scope, query: Signal>>) where K: Hash + Eq + Clone + 'static, V: Clone, + E: Clone, { sync_observers(cx, query); ensure_cache_cleanup(cx, query); } /// On mount, ensure that the resource is not stale -fn ensure_not_stale( +fn ensure_not_stale( cx: Scope, - query: Signal>, + query: Signal>, executor: impl Fn() + Clone + 'static, ) { create_isomorphic_effect(cx, move |_| { @@ -114,9 +158,9 @@ fn ensure_not_stale( } /// Refetch data once marked as invalid. -fn ensure_not_invalid( +fn ensure_not_invalid( cx: Scope, - state: Signal>, + state: Signal>, executor: impl Fn() + 'static, ) { create_isomorphic_effect(cx, move |_| { @@ -129,10 +173,14 @@ fn ensure_not_invalid( } /// Effect for refetching query on interval, if present. -fn sync_refetch(cx: Scope, query: Signal>, executor: impl Fn() + Clone + 'static) -where +fn sync_refetch( + cx: Scope, + query: Signal>, + executor: impl Fn() + Clone + 'static, +) where K: Clone + 'static, V: Clone + 'static, + E: Clone + 'static, { let _ = use_timeout(cx, move || { let query = query.get(); @@ -156,7 +204,7 @@ where } // Ensure that observers are kept track of. -fn sync_observers(cx: Scope, query: Signal>>) { +fn sync_observers(cx: Scope, query: Signal>>) { type Observer = Rc>; let last_observer: Rc>> = Rc::new(Cell::new(None)); @@ -189,10 +237,11 @@ fn sync_observers(cx: Scope, query: Signal(cx: Scope, query: Signal>>) +pub(crate) fn ensure_cache_cleanup(cx: Scope, query: Signal>>) where K: Clone + Hash + Eq + 'static, V: Clone + 'static, + E: Clone + 'static, { let root_scope = use_query_client(cx).cx; @@ -253,8 +302,8 @@ where let dispose = { let query = query.clone(); move || { - let removed = - use_query_client(root_scope).evict_and_notify::(&query.key); + let removed = use_query_client(root_scope) + .evict_and_notify::(&query.key); if let Some(query) = removed { if query.observers.get() == 0 { query.dispose(); diff --git a/src/query_options.rs b/src/query_options.rs index 6c8a1c3..f7b63cc 100644 --- a/src/query_options.rs +++ b/src/query_options.rs @@ -1,12 +1,16 @@ use std::time::Duration; +use crate::schedule::{Schedule, ScheduleBuilt}; + /** * Options for a query [`crate::use_query::use_query`] */ #[derive(Clone)] -pub struct QueryOptions { +pub struct QueryOptions { /// Placeholder value to use while the query is loading for the first time. pub default_value: Option, + // Retry Schedule for a query. + pub retry: ScheduleBuilt, /// The duration that should pass before a query is considered stale. /// If the query is stale, it will be refetched. /// If no stale time, the query will never be considered stale. @@ -36,7 +40,7 @@ pub enum ResourceOption { Blocking, } -impl QueryOptions { +impl QueryOptions { /// Empty options. pub fn empty() -> Self { Self { @@ -45,6 +49,7 @@ impl QueryOptions { cache_time: None, refetch_interval: None, resource_option: ResourceOption::NonBlocking, + retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), } } /// QueryOption with custom stale_time. @@ -55,6 +60,7 @@ impl QueryOptions { cache_time: Some(DEFAULT_CACHE_TIME), refetch_interval: None, resource_option: ResourceOption::NonBlocking, + retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), } } @@ -66,6 +72,7 @@ impl QueryOptions { cache_time: Some(DEFAULT_CACHE_TIME), refetch_interval: Some(refetch_interval), resource_option: ResourceOption::NonBlocking, + retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), } } } @@ -89,7 +96,7 @@ pub(crate) fn ensure_valid_stale_time( const DEFAULT_STALE_TIME: Duration = Duration::from_secs(0); const DEFAULT_CACHE_TIME: Duration = Duration::from_secs(60 * 5); -impl Default for QueryOptions { +impl Default for QueryOptions { fn default() -> Self { Self { default_value: None, @@ -97,6 +104,7 @@ impl Default for QueryOptions { cache_time: Some(DEFAULT_CACHE_TIME), refetch_interval: None, resource_option: ResourceOption::NonBlocking, + retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), } } } diff --git a/src/query_result.rs b/src/query_result.rs index 16e7078..71910a3 100644 --- a/src/query_result.rs +++ b/src/query_result.rs @@ -9,16 +9,17 @@ use leptos::*; /// Reactive query result. #[derive(Clone)] -pub struct QueryResult +pub struct QueryResult where V: 'static, + E: 'static, R: RefetchFn, { /// The current value of the query. None if it has not been fetched yet. /// Should be called inside of a [`Transition`](leptos::Transition) or [`Suspense`](leptos::Suspense) component. pub data: Signal>, /// The current state of the data. - pub state: Signal>, + pub state: Signal>, /// If the query is fetching for the first time. pub is_loading: Signal, @@ -37,12 +38,12 @@ where pub trait RefetchFn: Fn() + Clone {} impl RefetchFn for R {} -pub(crate) fn create_query_result( +pub(crate) fn create_query_result( cx: Scope, - query: Signal>, + query: Signal>, data: Signal>, executor: impl Fn() + Clone, -) -> QueryResult { +) -> QueryResult { let state = Signal::derive(cx, move || query.get().state.get()); let is_loading = Signal::derive(cx, move || matches!(state.get(), QueryState::Loading)); @@ -66,9 +67,9 @@ pub(crate) fn create_query_result( } } -fn make_is_stale( +fn make_is_stale( cx: Scope, - state: Signal>, + state: Signal>, stale_time: Signal>, ) -> Signal { let (stale, set_stale) = create_signal(cx, false); diff --git a/src/query_state.rs b/src/query_state.rs index 57143c2..e92d936 100644 --- a/src/query_state.rs +++ b/src/query_state.rs @@ -5,7 +5,7 @@ use crate::Instant; /// Each variant in the enum corresponds to a particular state of a query in its lifecycle, /// starting from creation and covering all possible transitions up to invalidation. #[derive(Clone, PartialEq, Eq)] -pub enum QueryState { +pub enum QueryState { /// The initial state of a Query upon its creation. /// /// In this state, a query is instantiated but no fetching operation has been initiated yet. @@ -35,16 +35,29 @@ pub enum QueryState { /// /// The associated `QueryData` object holds the invalidated data. Invalid(QueryData), + + Error(QueryError), + Retrying(Option>), +} + +#[derive(Clone, PartialEq, Eq)] +pub struct QueryError { + pub(crate) error: E, + pub(crate) error_count: usize, + pub(crate) updated_at: Instant, + pub(crate) prev_data: Option>, } -impl QueryState { +impl QueryState { /// Returns the QueryData for the current QueryState, if present. pub fn query_data(&self) -> Option<&QueryData> { match self { - QueryState::Loading | QueryState::Created => None, - QueryState::Fetching(data) | QueryState::Loaded(data) | QueryState::Invalid(data) => { - Some(data) - } + QueryState::Loading | QueryState::Created | QueryState::Retrying(None) => None, + QueryState::Fetching(data) + | QueryState::Loaded(data) + | QueryState::Invalid(data) + | QueryState::Retrying(Some(data)) => Some(data), + QueryState::Error(QueryError { prev_data, .. }) => prev_data.as_ref(), } } @@ -59,20 +72,20 @@ impl QueryState { } } -impl std::fmt::Debug for QueryState -where - V: std::fmt::Debug, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Created => write!(f, "Created"), - Self::Loading => write!(f, "Loading"), - Self::Fetching(arg0) => f.debug_tuple("Fetching").field(arg0).finish(), - Self::Loaded(arg0) => f.debug_tuple("Loaded").field(arg0).finish(), - Self::Invalid(arg0) => f.debug_tuple("Invalid").field(arg0).finish(), - } - } -} +// impl std::fmt::Debug for QueryState +// where +// V: std::fmt::Debug, +// { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// match self { +// Self::Created => write!(f, "Created"), +// Self::Loading => write!(f, "Loading"), +// Self::Fetching(arg0) => f.debug_tuple("Fetching").field(arg0).finish(), +// Self::Loaded(arg0) => f.debug_tuple("Loaded").field(arg0).finish(), +// Self::Invalid(arg0) => f.debug_tuple("Invalid").field(arg0).finish(), +// } +// } +// } /// The latest data for a Query. #[derive(Clone, PartialEq, Eq)] diff --git a/src/schedule.rs b/src/schedule.rs new file mode 100644 index 0000000..ab212b1 --- /dev/null +++ b/src/schedule.rs @@ -0,0 +1,416 @@ +use std::{marker::PhantomData, time::Duration}; + +pub trait Schedule { + type Error; + + fn next(&mut self, error: &Self::Error) -> Option; + + fn union(self, other: U) -> Union + where + Self: Sized, + U: Schedule, + { + Union { a: self, b: other } + } + + fn intersect(self, other: U) -> Intersect + where + Self: Sized, + U: Schedule, + { + Intersect { a: self, b: other } + } + + fn concat(self, other: U) -> Sequence + where + Self: Sized, + U: Schedule, + { + Sequence { a: self, b: other } + } + + fn map(self, func: F) -> Mapped + where + Self: Sized, + F: Fn((&Self::Error, Option)) -> Option, + { + Mapped { + schedule: self, + func, + } + } + + fn take(self, n: u32) -> Take + where + Self: Sized, + { + Take { n, schedule: self } + } + + fn take_while(self, func: F) -> TakeWhile + where + Self: Sized, + F: Fn((&Self::Error, Duration)) -> bool, + { + TakeWhile { + schedule: self, + func, + } + } + + fn clamp(self, min: Duration, max: Duration) -> Clamp + where + Self: Sized, + { + Clamp { + schedule: self, + max: Some(max), + min: Some(min), + } + } + + fn clamp_max(self, duration: Duration) -> Clamp + where + Self: Sized, + { + Clamp { + schedule: self, + max: Some(duration), + min: None, + } + } + + fn clamp_min(self, duration: Duration) -> Clamp + where + Self: Sized, + { + Clamp { + schedule: self, + max: None, + min: Some(duration), + } + } + + fn build(self) -> ScheduleBuilt + where + Self: Sized + 'static, + { + ScheduleBuilt(std::rc::Rc::new(self)) + } +} + +#[derive(Clone)] +pub struct ScheduleBuilt(std::rc::Rc>); + +pub struct Recur { + n: u32, + error_type: PhantomData, +} + +impl Schedule for Recur { + type Error = E; + fn next(&mut self, _: &Self::Error) -> Option { + if self.n > 0 { + self.n -= 1; + Some(Duration::ZERO) + } else { + None + } + } +} + +pub struct Spaced { + duration: Duration, + error_type: PhantomData, +} + +impl Schedule for Spaced { + type Error = E; + fn next(&mut self, _: &Self::Error) -> Option { + Some(self.duration) + } +} + +pub struct Sequence { + a: A, + b: B, +} + +impl Schedule for Sequence +where + A: Schedule, + B: Schedule, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + let a = self.a.next(error); + if let Some(_) = a { + a + } else { + self.b.next(error) + } + } +} + +pub struct Union { + a: A, + b: B, +} + +impl Schedule for Union +where + A: Schedule, + B: Schedule, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + let a = self.a.next(error); + let b = self.b.next(error); + + match (a, b) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(_), None) => a, + (None, Some(_)) => b, + _ => None, + } + } +} + +pub struct Intersect { + a: A, + b: B, +} + +impl Schedule for Intersect +where + A: Schedule, + B: Schedule, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + let a = self.a.next(error); + let b = self.b.next(error); + + match (a, b) { + (Some(a), Some(b)) => Some(a.max(b)), + _ => None, + } + } +} + +pub struct Exponential { + base: Duration, + n: u32, + factor: f32, + error_type: PhantomData, +} + +impl Schedule for Exponential { + type Error = E; + fn next(&mut self, _: &Self::Error) -> Option { + let n = self.n; + self.n += 1; + if n == 0 { + Some(self.base) + } else { + let mult = self.factor.powf(n as f32); + let delay = self.base.mul_f32(mult); + Some(delay) + } + } +} + +pub struct Mapped { + schedule: A, + func: F, +} + +impl Schedule for Mapped +where + A: Schedule, + F: Fn((&E, Option)) -> Option, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + let next = self.schedule.next(error); + (self.func)((error, next)) + } +} + +pub struct Take { + schedule: A, + n: u32, +} + +impl Schedule for Take +where + A: Schedule, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + if self.n > 0 { + self.n -= 1; + self.schedule.next(error) + } else { + None + } + } +} + +pub struct TakeWhile { + schedule: A, + func: F, +} + +impl Schedule for TakeWhile +where + A: Schedule, + F: Fn((&E, Duration)) -> bool, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + self.schedule + .next(error) + .filter(|d| (self.func)((error, *d))) + } +} + +pub struct Clamp { + schedule: A, + min: Option, + max: Option, +} + +impl Schedule for Clamp +where + A: Schedule, +{ + type Error = E; + + fn next(&mut self, error: &Self::Error) -> Option { + let next = self.schedule.next(error); + next.map(|d| { + let bottom = if let Some(min) = self.min { + min.max(d) + } else { + d + }; + + if let Some(max) = self.max { + max.min(bottom) + } else { + bottom + } + }) + } +} + +pub fn recurs(n: u32) -> impl Schedule { + Recur { + n, + error_type: PhantomData, + } +} + +pub fn spaced(d: Duration) -> impl Schedule { + Spaced { + duration: d, + error_type: PhantomData, + } +} + +pub fn exponential(base: Duration, factor: f32) -> impl Schedule { + Exponential { + n: 0, + base, + factor, + error_type: PhantomData, + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_recurs() { + let mut r = recurs(2); + assert_eq!(Some(Duration::ZERO), r.next(&())); + assert_eq!(Some(Duration::ZERO), r.next(&())); + assert_eq!(None, r.next(&())); + } + + #[test] + fn test_spaced() { + let d = Duration::from_millis(500); + let mut schedule = recurs(2).intersect(spaced(d)); + assert_eq!(Some(d), schedule.next(&())); + assert_eq!(Some(d), schedule.next(&())); + assert_eq!(None, schedule.next(&())); + } + + #[test] + fn test_sequence() { + let d = Duration::from_millis(500); + let left = recurs(2).intersect(spaced(d)); + let right = recurs(2); + let mut schedule = left.concat(right); + + assert_eq!(Some(d), schedule.next(&())); + assert_eq!(Some(d), schedule.next(&())); + assert_eq!(Some(Duration::ZERO), schedule.next(&())); + assert_eq!(Some(Duration::ZERO), schedule.next(&())); + assert_eq!(None, schedule.next(&())); + } + + #[test] + fn test_exponential() { + let mut schedule = exponential(Duration::from_millis(500), 2.0).take(6); + + assert_eq!(Some(Duration::from_millis(500)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(1000)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(2000)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(4000)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(8000)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(16000)), schedule.next(&())); + assert_eq!(None, schedule.next(&())); + } + + #[test] + fn test_exponential_while() { + let mut schedule = exponential(Duration::from_millis(500), 2.0) + .take_while(|(_, d)| d < Duration::from_millis(2001)); + + assert_eq!(Some(Duration::from_millis(500)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(1000)), schedule.next(&())); + assert_eq!(Some(Duration::from_millis(2000)), schedule.next(&())); + assert_eq!(None, schedule.next(&())); + } + + #[test] + fn test_exponential_clamp() { + let mut schedule = exponential(Duration::from_millis(100), 2.0) + .clamp_min(Duration::from_secs(1)) + .clamp_max(Duration::from_secs(4)); + + let mut next = move || schedule.next(&()).map(|d| d.as_millis()); + + // 100 + assert_eq!(Some(1000), next()); + // 200 + assert_eq!(Some(1000), next()); + // 400 + assert_eq!(Some(1000), next()); + // 800 + assert_eq!(Some(1000), next()); + assert_eq!(Some(1600), next()); + assert_eq!(Some(3200), next()); + assert_eq!(Some(4000), next()); + assert_eq!(Some(4000), next()); + } +} diff --git a/src/use_query.rs b/src/use_query.rs index ac4aeb3..30c7a3d 100644 --- a/src/use_query.rs +++ b/src/use_query.rs @@ -1,8 +1,8 @@ use crate::query_executor::{create_executor, synchronize_state}; use crate::query_result::QueryResult; use crate::{ - create_query_result, use_query_client, Query, QueryData, QueryOptions, QueryState, RefetchFn, - ResourceOption, + create_query_result, use_query_client, Query, QueryData, QueryError, QueryOptions, QueryState, + RefetchFn, ResourceOption, }; use leptos::*; use std::future::Future; @@ -60,15 +60,36 @@ pub fn use_query( cx: Scope, key: impl Fn() -> K + 'static, fetcher: impl Fn(K) -> Fu + 'static, - options: QueryOptions, -) -> QueryResult + options: QueryOptions, +) -> QueryResult where K: Hash + Eq + Clone + 'static, V: Clone + Serializable + 'static, Fu: Future + 'static, +{ + let fetcher = std::rc::Rc::new(fetcher); + let fetcher = move |key: K| { + let fetcher = fetcher.clone(); + async move { Ok(fetcher(key).await) as Result } + }; + + use_query_with_retry(cx, key, fetcher, options) +} + +pub fn use_query_with_retry( + cx: Scope, + key: impl Fn() -> K + 'static, + fetcher: impl Fn(K) -> Fu + 'static, + options: QueryOptions, +) -> QueryResult +where + K: Hash + Eq + Clone + 'static, + V: Clone + Serializable + 'static, + E: Clone + Serializable + 'static, + Fu: Future> + 'static, { // Find relevant state. - let query = use_query_client(cx).get_query_signal(cx, key); + let query = use_query_client(cx).get_query_signal::(cx, key); // Update options. create_isomorphic_effect(cx, { @@ -85,16 +106,24 @@ where let query = Signal::derive(cx, move || query.get().0); - let resource_fetcher = move |query: Query| { + let resource_fetcher = move |query: Query| { async move { match query.state.get_untracked() { // Immediately provide cached value. QueryState::Loaded(data) | QueryState::Invalid(data) - | QueryState::Fetching(data) => ResourceData(Some(data.data)), + | QueryState::Fetching(data) + | QueryState::Retrying(Some(data)) + | QueryState::Error(QueryError { + prev_data: Some(data), + .. + }) => ResourceData(Some(data.data)), // Suspend indefinitely and wait for interruption. - QueryState::Created | QueryState::Loading => { + QueryState::Created + | QueryState::Loading + | QueryState::Error(_) + | QueryState::Retrying(None) => { sleep(LONG_TIME).await; ResourceData(None) } @@ -102,7 +131,7 @@ where } }; - let resource: Resource, ResourceData> = { + let resource: Resource, ResourceData> = { let default = options.default_value; match options.resource_option { ResourceOption::NonBlocking => create_resource_with_initial_value( @@ -137,7 +166,7 @@ where // Ensure key changes are considered. create_isomorphic_effect(cx, { let executor = executor.clone(); - move |prev_query: Option>| { + move |prev_query: Option>| { let query = query.get(); if let Some(prev_query) = prev_query { if prev_query != query { @@ -218,3 +247,44 @@ where } } } + +/// A Type that cannot be instantiated. Useful to have a Result that cannot fail. +#[derive(Clone)] +pub enum Never {} + +struct NeverSerde(); + +impl std::error::Error for NeverSerde { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } + + fn description(&self) -> &str { + "description() is deprecated; use Display" + } +} + +impl std::fmt::Display for NeverSerde { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("NeverSerde") + } +} +impl std::fmt::Debug for NeverSerde { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("NeverSerde").finish() + } +} + +impl Serializable for Never { + fn ser(&self) -> Result { + Err(SerializationError::Serialize( + std::rc::Rc::new(NeverSerde()), + )) + } + + fn de(bytes: &str) -> Result { + Err(SerializationError::Deserialize(std::rc::Rc::new( + NeverSerde(), + ))) + } +} From f98447698250faf30c6caee195be4e0fdc6766c3 Mon Sep 17 00:00:00 2001 From: Nico Burniske Date: Wed, 23 Aug 2023 18:48:09 -0400 Subject: [PATCH 2/2] checkpoint with error_handler effect --- Cargo.lock | 7 ++ Cargo.toml | 3 +- example/start-axum/src/app.rs | 32 +++--- example/start-axum/src/todo.rs | 10 +- src/lib.rs | 1 + src/query_executor.rs | 104 ++++++++++++++++-- src/query_options.rs | 12 +-- src/query_result.rs | 31 +++++- src/query_state.rs | 102 ++++++++++++------ src/schedule.rs | 191 +++++++++++++++++++++++++++------ src/use_query.rs | 86 ++++++++------- 11 files changed, 433 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 900137d..aa06b02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,6 +304,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "669a445ee724c5c69b1b06fe0b63e70a1c84bc9bb7d9696cd4f4e3ec45050408" +[[package]] +name = "dyn-clone" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555" + [[package]] name = "educe" version = "0.4.22" @@ -830,6 +836,7 @@ name = "leptos_query" version = "0.2.3" dependencies = [ "cfg-if", + "dyn-clone", "gloo-timers", "js-sys", "leptos", diff --git a/Cargo.toml b/Cargo.toml index c31952d..0187eb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,11 @@ cfg-if = "1" js-sys = {version = "0.3.64", optional = true} gloo-timers = { version = "0.2.6", optional = true, features = ["futures"] } tokio = { version = "1.29.1", optional = true, features = ["time"]} +dyn-clone = "1.0.13" [features] hydrate = ["dep:js-sys", "dep:gloo-timers"] ssr = ["dep:tokio"] [package.metadata.docs.rs] -all-features = true \ No newline at end of file +all-features = true diff --git a/example/start-axum/src/app.rs b/example/start-axum/src/app.rs index 616da2c..2368ba9 100644 --- a/example/start-axum/src/app.rs +++ b/example/start-axum/src/app.rs @@ -82,11 +82,11 @@ pub fn App(cx: Scope) -> impl IntoView { #[component] fn HomePage(cx: Scope) -> impl IntoView { let invalidate_one = move |_| { - use_query_client(cx).invalidate_query::(&1); + // use_query_client(cx).invalidate_query::(&1); }; let prefetch_two = move |_| { - use_query_client(cx).prefetch_query(cx, || 2, get_post_unwrapped, true); + // use_query_client(cx).prefetch_query(cx, || 2, get_post_unwrapped, true); }; view! { cx, @@ -136,17 +136,18 @@ fn HomePage(cx: Scope) -> impl IntoView { fn use_post_query( cx: Scope, key: impl Fn() -> u32 + 'static, -) -> QueryResult, impl RefetchFn> { - use_query( +) -> QueryResult { + use_query_with_retry( cx, key, - get_post_unwrapped, + get_post, QueryOptions { default_value: None, refetch_interval: None, resource_option: ResourceOption::NonBlocking, stale_time: Some(Duration::from_secs(5)), cache_time: Some(Duration::from_secs(60)), + retry: Schedules::recur(3).build(), }, ) } @@ -159,11 +160,15 @@ async fn get_post_unwrapped(id: u32) -> Option { #[server(GetPost, "/api")] pub async fn get_post(id: u32) -> Result { use leptos_query::Instant; - - log!("Fetching post: {}", id); tokio::time::sleep(Duration::from_millis(2000)).await; - let instant = Instant::now(); - Ok(format!("Post {}: Timestamp {}", id, instant)) + // random number, if it's 0, we'll return an error. + // if random == 0 { + return Err(ServerFnError::ServerError("Random error".into())); + // } else { + // log!("Fetching post: {}", id); + // let instant = Instant::now(); + // Ok(format!("Post {}: Timestamp {}", id, instant)) + // } } #[component] @@ -222,16 +227,11 @@ fn Post(cx: Scope, #[prop(into)] post_id: MaybeSignal) -> impl IntoView { view! { cx,

"Loading..."

} }>

- { + {move || { data .get() - .map(|post| { - match post { - Some(post) => post, - None => "Not Found".into(), - } - }) } + }

diff --git a/example/start-axum/src/todo.rs b/example/start-axum/src/todo.rs index de1ff4d..bc8219e 100644 --- a/example/start-axum/src/todo.rs +++ b/example/start-axum/src/todo.rs @@ -153,7 +153,7 @@ fn AllTodos(cx: Scope) -> impl IntoView { async move { let _ = delete_todo(id).await; refetch(); - use_query_client(cx).invalidate_query::(&id); + // use_query_client(cx).invalidate_query::(&id); } }); @@ -203,14 +203,14 @@ fn AddTodoComponent(cx: Scope) -> impl IntoView { if let Some(Ok(todo)) = response.get() { let id = todo.id; // Invalidate individual TodoResponse. - client.clone().invalidate_query::(id); + // client.clone().invalidate_query::(id); // Invalidate AllTodos. - client.clone().invalidate_query::<(), Vec>(()); + // client.clone().invalidate_query::<(), Vec>(()); // Optimistic update. - let as_response = Ok(Some(todo)); - client.set_query_data::(id, |_| Some(as_response)); + // let as_response = Ok(Some(todo)); + // client.set_query_data::(id, |_| Some(as_response)); } }); diff --git a/src/lib.rs b/src/lib.rs index c2ca8ad..4a67392 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -142,4 +142,5 @@ pub use crate::query_executor::*; pub use crate::query_options::*; pub use crate::query_result::*; pub use crate::query_state::*; +pub use crate::schedule::*; pub use crate::use_query::*; diff --git a/src/query_executor.rs b/src/query_executor.rs index 693ee63..80ab3e7 100644 --- a/src/query_executor.rs +++ b/src/query_executor.rs @@ -1,4 +1,4 @@ -use leptos::*; +use leptos::{leptos_dom::helpers::TimeoutHandle, *}; use std::{ cell::{Cell, RefCell}, collections::HashMap, @@ -9,6 +9,7 @@ use std::{ use crate::{ query::Query, + schedule::ScheduleBuilt, use_query_client, util::{maybe_time_until_stale, time_until_stale, use_timeout}, QueryData, QueryError, QueryState, @@ -35,6 +36,7 @@ where Fu: Future> + 'static, { let fetcher = Rc::new(fetcher); + move || { let fetcher = fetcher.clone(); SUPPRESS_QUERY_LOAD.with(|supressed| { @@ -46,8 +48,8 @@ where QueryState::Fetching(_) | QueryState::Loading | QueryState::Retrying(_) => { () } - // First load. - QueryState::Created => { + // First load or recovering from Panic. + QueryState::Created | QueryState::Fatal(_) => { query.state.set(QueryState::Loading); let result = fetcher(query.key.clone()).await; let updated_at = crate::Instant::now(); @@ -58,7 +60,7 @@ where } Err(error) => query.state.set(QueryState::Error(QueryError { error, - error_count: 0, + error_count: 1, updated_at, prev_data: None, })), @@ -76,16 +78,15 @@ where } Err(error) => query.state.set(QueryState::Error(QueryError { error, - error_count: 0, + error_count: 1, updated_at, prev_data: Some(data), })), } } + // Retrying an error. QueryState::Error(prev_error) => { - query - .state - .set(QueryState::Retrying(prev_error.prev_data.clone())); + query.state.set(QueryState::Retrying(prev_error.clone())); let result = fetcher(query.key.clone()).await; let updated_at = crate::Instant::now(); match result { @@ -95,7 +96,7 @@ where } Err(error) => query.state.set(QueryState::Error(QueryError { error, - error_count: prev_error.error_count, + error_count: prev_error.error_count + 1, updated_at, prev_data: prev_error.prev_data, })), @@ -113,6 +114,7 @@ pub(crate) fn synchronize_state( cx: Scope, query: Signal>, executor: impl Fn() + Clone + 'static, + schedule: ScheduleBuilt, ) where K: Hash + Eq + Clone + 'static, V: Clone, @@ -121,11 +123,86 @@ pub(crate) fn synchronize_state( ensure_not_stale(cx, query, executor.clone()); ensure_not_invalid(cx, query, executor.clone()); sync_refetch(cx, query, executor.clone()); + error_handler(cx, query, executor, schedule); let query = Signal::derive(cx, move || Some(query.get())); synchronize_observer(cx, query); } +pub(crate) fn error_handler( + cx: Scope, + query: Signal>, + executor: impl Fn() + Clone + 'static, + schedule: ScheduleBuilt, +) where + K: Hash + Eq + Clone + 'static, + V: Clone, + E: Clone, +{ + create_effect(cx, { + move |last: Option, ScheduleBuilt, Option)>>| { + let query = query.get(); + let state = query.state.get(); + + // Reset on success. + if let QueryState::Loaded(_) = state { + if let Some(timeout) = last.flatten().and_then(|t| t.2) { + timeout.clear(); + } + return None; + } else { + // If different query, create fresh schedule. + let get_retry = { + let query = query.clone(); + let schedule = schedule.clone(); + move || { + if let Some((last_query, last_schedule, timeout)) = last.flatten() { + if let Some(timeout) = timeout { + timeout.clear() + } + if last_query == query { + last_schedule + } else { + schedule.clone() + } + } else { + schedule.clone() + } + } + }; + if let QueryState::Error(error) = state { + let mut schedule: ScheduleBuilt = get_retry(); + + log!("Failure detected. Retrying."); + if let Some(timeout) = schedule.0.next(&error.error) { + let timeout = set_timeout_with_handle( + { + let executor = executor.clone(); + move || { + executor(); + } + }, + timeout, + ) + .ok(); + + Some((query, schedule, timeout)) + } else { + // Fatality. + query.state.set(QueryState::Fatal(error)); + None + } + } else if let QueryState::Retrying(_) = state { + let schedule = get_retry(); + Some((query, schedule, None)) + } else { + None + } + } + } + }) +} + pub(crate) fn synchronize_observer(cx: Scope, query: Signal>>) where K: Hash + Eq + Clone + 'static, @@ -146,7 +223,14 @@ fn ensure_not_stale( let query = query.get(); let stale_time = query.stale_time; - if let (Some(updated_at), Some(stale_time)) = ( + // If has previously failed fataly. + if query + .state + .with_untracked(|s| matches!(s, QueryState::Fatal(_))) + { + executor(); + // If is currently stale. + } else if let (Some(updated_at), Some(stale_time)) = ( query.state.get_untracked().updated_at(), stale_time.get_untracked(), ) { diff --git a/src/query_options.rs b/src/query_options.rs index f7b63cc..52ec052 100644 --- a/src/query_options.rs +++ b/src/query_options.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::schedule::{Schedule, ScheduleBuilt}; +use crate::schedule::*; /** * Options for a query [`crate::use_query::use_query`] @@ -9,7 +9,7 @@ use crate::schedule::{Schedule, ScheduleBuilt}; pub struct QueryOptions { /// Placeholder value to use while the query is loading for the first time. pub default_value: Option, - // Retry Schedule for a query. + /// Retry Schedule for a query. pub retry: ScheduleBuilt, /// The duration that should pass before a query is considered stale. /// If the query is stale, it will be refetched. @@ -49,7 +49,7 @@ impl QueryOptions { cache_time: None, refetch_interval: None, resource_option: ResourceOption::NonBlocking, - retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), + retry: Schedules::spaced(Duration::ZERO).take(3).build(), } } /// QueryOption with custom stale_time. @@ -60,7 +60,7 @@ impl QueryOptions { cache_time: Some(DEFAULT_CACHE_TIME), refetch_interval: None, resource_option: ResourceOption::NonBlocking, - retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), + retry: Schedules::spaced(Duration::ZERO).take(3).build(), } } @@ -72,7 +72,7 @@ impl QueryOptions { cache_time: Some(DEFAULT_CACHE_TIME), refetch_interval: Some(refetch_interval), resource_option: ResourceOption::NonBlocking, - retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), + retry: Schedules::spaced(Duration::ZERO).take(3).build(), } } } @@ -104,7 +104,7 @@ impl Default for QueryOptions { cache_time: Some(DEFAULT_CACHE_TIME), refetch_interval: None, resource_option: ResourceOption::NonBlocking, - retry: crate::schedule::spaced(Duration::ZERO).take(3).build(), + retry: Schedules::spaced(Duration::ZERO).take(3).build(), } } } diff --git a/src/query_result.rs b/src/query_result.rs index 71910a3..2167c44 100644 --- a/src/query_result.rs +++ b/src/query_result.rs @@ -3,7 +3,7 @@ use std::time::Duration; use crate::{ query::Query, util::{maybe_time_until_stale, use_timeout}, - QueryState, + QueryError, QueryState, }; use leptos::*; @@ -17,7 +17,7 @@ where { /// The current value of the query. None if it has not been fetched yet. /// Should be called inside of a [`Transition`](leptos::Transition) or [`Suspense`](leptos::Suspense) component. - pub data: Signal>, + pub data: Signal>>, /// The current state of the data. pub state: Signal>, @@ -41,14 +41,35 @@ impl RefetchFn for R {} pub(crate) fn create_query_result( cx: Scope, query: Signal>, - data: Signal>, + data: Signal>>, executor: impl Fn() + Clone, ) -> QueryResult { let state = Signal::derive(cx, move || query.get().state.get()); - let is_loading = Signal::derive(cx, move || matches!(state.get(), QueryState::Loading)); + let is_loading = Signal::derive(cx, move || { + matches!( + state.get(), + QueryState::Loading + | QueryState::Error(QueryError { + prev_data: None, + .. + }) + | QueryState::Retrying(QueryError { + prev_data: None, + .. + }) + ) + }); let is_fetching = Signal::derive(cx, move || { - matches!(state.get(), QueryState::Loading | QueryState::Fetching(_)) + matches!( + state.get(), + QueryState::Loading + | QueryState::Fetching(_) + | QueryState::Retrying(QueryError { + prev_data: None, + .. + }) + ) }); let is_invalid = Signal::derive(cx, move || matches!(state.get(), QueryState::Invalid(_))); diff --git a/src/query_state.rs b/src/query_state.rs index e92d936..ba2b724 100644 --- a/src/query_state.rs +++ b/src/query_state.rs @@ -1,4 +1,5 @@ use crate::Instant; +use std::fmt::Debug; /// The lifecycle of a query. /// @@ -36,28 +37,42 @@ pub enum QueryState { /// The associated `QueryData` object holds the invalidated data. Invalid(QueryData), + /// Error has occured during fetching. Error(QueryError), - Retrying(Option>), -} -#[derive(Clone, PartialEq, Eq)] -pub struct QueryError { - pub(crate) error: E, - pub(crate) error_count: usize, - pub(crate) updated_at: Instant, - pub(crate) prev_data: Option>, + /// Retrying after error. + Retrying(QueryError), + + /// Query has errored the maximum number of times. + Fatal(QueryError), } impl QueryState { /// Returns the QueryData for the current QueryState, if present. pub fn query_data(&self) -> Option<&QueryData> { match self { - QueryState::Loading | QueryState::Created | QueryState::Retrying(None) => None, - QueryState::Fetching(data) - | QueryState::Loaded(data) - | QueryState::Invalid(data) - | QueryState::Retrying(Some(data)) => Some(data), - QueryState::Error(QueryError { prev_data, .. }) => prev_data.as_ref(), + QueryState::Loading | QueryState::Created => None, + QueryState::Fetching(data) | QueryState::Loaded(data) | QueryState::Invalid(data) => { + Some(data) + } + QueryState::Error(QueryError { prev_data, .. }) + | QueryState::Retrying(QueryError { prev_data, .. }) + | QueryState::Fatal(QueryError { prev_data, .. }) => prev_data.as_ref(), + } + } + + pub fn result(self) -> Option> { + match self { + QueryState::Fatal(QueryError { error, .. }) => Some(Err(error)), + QueryState::Loading | QueryState::Created => None, + + QueryState::Fetching(data) | QueryState::Loaded(data) | QueryState::Invalid(data) => { + Some(Ok(data.data)) + } + QueryState::Error(QueryError { prev_data, .. }) + | QueryState::Retrying(QueryError { prev_data, .. }) => { + prev_data.map(|d| d.data).map(Ok) + } } } @@ -72,20 +87,47 @@ impl QueryState { } } -// impl std::fmt::Debug for QueryState -// where -// V: std::fmt::Debug, -// { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// match self { -// Self::Created => write!(f, "Created"), -// Self::Loading => write!(f, "Loading"), -// Self::Fetching(arg0) => f.debug_tuple("Fetching").field(arg0).finish(), -// Self::Loaded(arg0) => f.debug_tuple("Loaded").field(arg0).finish(), -// Self::Invalid(arg0) => f.debug_tuple("Invalid").field(arg0).finish(), -// } -// } -// } +impl Debug for QueryState +where + V: Debug, + E: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Created => write!(f, "Created"), + Self::Loading => write!(f, "Loading"), + Self::Fetching(arg0) => f.debug_tuple("Fetching").field(arg0).finish(), + Self::Loaded(arg0) => f.debug_tuple("Loaded").field(arg0).finish(), + Self::Invalid(arg0) => f.debug_tuple("Invalid").field(arg0).finish(), + QueryState::Error(arg0) => f.debug_tuple("Error").field(arg0).finish(), + QueryState::Retrying(arg0) => f.debug_tuple("Retrying").field(arg0).finish(), + QueryState::Fatal(arg0) => f.debug_tuple("Panic").field(arg0).finish(), + } + } +} + +#[derive(Clone, PartialEq, Eq)] +pub struct QueryError { + pub(crate) error: E, + pub(crate) error_count: usize, + pub(crate) updated_at: Instant, + pub(crate) prev_data: Option>, +} + +impl Debug for QueryError +where + V: Debug, + E: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QueryError") + .field("error", &self.error) + .field("error_count", &self.error_count) + .field("updated_at", &self.updated_at) + .field("prev_data", &self.prev_data) + .finish() + } +} /// The latest data for a Query. #[derive(Clone, PartialEq, Eq)] @@ -106,9 +148,9 @@ impl QueryData { } } -impl std::fmt::Debug for QueryData +impl Debug for QueryData where - V: std::fmt::Debug, + V: Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("QueryData") diff --git a/src/schedule.rs b/src/schedule.rs index ab212b1..2f88552 100644 --- a/src/schedule.rs +++ b/src/schedule.rs @@ -1,6 +1,7 @@ -use std::{marker::PhantomData, time::Duration}; +use dyn_clone::DynClone; +use std::{marker::PhantomData, rc::Rc, time::Duration}; -pub trait Schedule { +pub trait Schedule: DynClone { type Error; fn next(&mut self, error: &Self::Error) -> Option; @@ -29,14 +30,14 @@ pub trait Schedule { Sequence { a: self, b: other } } - fn map(self, func: F) -> Mapped + fn map(self, func: F) -> Mapped> where Self: Sized, F: Fn((&Self::Error, Option)) -> Option, { Mapped { schedule: self, - func, + func: Rc::new(func), } } @@ -47,14 +48,14 @@ pub trait Schedule { Take { n, schedule: self } } - fn take_while(self, func: F) -> TakeWhile + fn take_while(self, func: F) -> TakeWhile> where Self: Sized, F: Fn((&Self::Error, Duration)) -> bool, { TakeWhile { schedule: self, - func, + func: Rc::new(func), } } @@ -95,18 +96,29 @@ pub trait Schedule { where Self: Sized + 'static, { - ScheduleBuilt(std::rc::Rc::new(self)) + ScheduleBuilt(Box::new(self)) } } +dyn_clone::clone_trait_object!( Schedule); + #[derive(Clone)] -pub struct ScheduleBuilt(std::rc::Rc>); +pub struct ScheduleBuilt(pub(crate) Box>); pub struct Recur { n: u32, error_type: PhantomData, } +impl Clone for Recur { + fn clone(&self) -> Self { + Recur { + n: self.n, + error_type: self.error_type, + } + } +} + impl Schedule for Recur { type Error = E; fn next(&mut self, _: &Self::Error) -> Option { @@ -124,6 +136,15 @@ pub struct Spaced { error_type: PhantomData, } +impl Clone for Spaced { + fn clone(&self) -> Self { + Spaced { + duration: self.duration, + error_type: self.error_type, + } + } +} + impl Schedule for Spaced { type Error = E; fn next(&mut self, _: &Self::Error) -> Option { @@ -136,6 +157,19 @@ pub struct Sequence { b: B, } +impl Clone for Sequence +where + A: Schedule, + B: Schedule, +{ + fn clone(&self) -> Self { + Sequence { + a: dyn_clone::clone(&self.a), + b: dyn_clone::clone(&self.b), + } + } +} + impl Schedule for Sequence where A: Schedule, @@ -158,6 +192,19 @@ pub struct Union { b: B, } +impl Clone for Union +where + A: Schedule, + B: Schedule, +{ + fn clone(&self) -> Self { + Union { + a: dyn_clone::clone(&self.a), + b: dyn_clone::clone(&self.b), + } + } +} + impl Schedule for Union where A: Schedule, @@ -183,6 +230,19 @@ pub struct Intersect { b: B, } +impl Clone for Intersect +where + A: Schedule, + B: Schedule, +{ + fn clone(&self) -> Self { + Intersect { + a: dyn_clone::clone(&self.a), + b: dyn_clone::clone(&self.b), + } + } +} + impl Schedule for Intersect where A: Schedule, @@ -208,6 +268,17 @@ pub struct Exponential { error_type: PhantomData, } +impl Clone for Exponential { + fn clone(&self) -> Self { + Exponential { + base: self.base, + n: self.n, + factor: self.factor, + error_type: self.error_type, + } + } +} + impl Schedule for Exponential { type Error = E; fn next(&mut self, _: &Self::Error) -> Option { @@ -228,7 +299,19 @@ pub struct Mapped { func: F, } -impl Schedule for Mapped +impl Clone for Mapped> +where + A: Schedule, + F: Fn((&E, Option)) -> Option, +{ + fn clone(&self) -> Self { + Self { + schedule: dyn_clone::clone(&self.schedule), + func: self.func.clone(), + } + } +} +impl Schedule for Mapped> where A: Schedule, F: Fn((&E, Option)) -> Option, @@ -262,12 +345,37 @@ where } } +impl Clone for Take
+where + A: Schedule, +{ + fn clone(&self) -> Self { + Self { + schedule: dyn_clone::clone(&self.schedule), + n: self.n.clone(), + } + } +} + pub struct TakeWhile { schedule: A, func: F, } -impl Schedule for TakeWhile +impl Clone for TakeWhile> +where + A: Schedule, + F: Fn((&E, Duration)) -> bool, +{ + fn clone(&self) -> Self { + Self { + schedule: dyn_clone::clone(&self.schedule), + func: self.func.clone(), + } + } +} + +impl Schedule for TakeWhile> where A: Schedule, F: Fn((&E, Duration)) -> bool, @@ -287,6 +395,19 @@ pub struct Clamp { max: Option, } +impl Clone for Clamp +where + A: Schedule, +{ + fn clone(&self) -> Self { + Self { + schedule: dyn_clone::clone(&self.schedule), + min: self.min.clone(), + max: self.max.clone(), + } + } +} + impl Schedule for Clamp where A: Schedule, @@ -311,26 +432,30 @@ where } } -pub fn recurs(n: u32) -> impl Schedule { - Recur { - n, - error_type: PhantomData, +pub struct Schedules(); + +impl Schedules { + pub fn recur(n: u32) -> impl Schedule { + Recur { + n, + error_type: PhantomData, + } } -} -pub fn spaced(d: Duration) -> impl Schedule { - Spaced { - duration: d, - error_type: PhantomData, + pub fn spaced(d: Duration) -> impl Schedule { + Spaced { + duration: d, + error_type: PhantomData, + } } -} -pub fn exponential(base: Duration, factor: f32) -> impl Schedule { - Exponential { - n: 0, - base, - factor, - error_type: PhantomData, + pub fn exponential(base: Duration, factor: f32) -> impl Schedule { + Exponential { + n: 0, + base, + factor, + error_type: PhantomData, + } } } @@ -339,7 +464,7 @@ mod tests { use super::*; #[test] fn test_recurs() { - let mut r = recurs(2); + let mut r = Schedules::recur(2); assert_eq!(Some(Duration::ZERO), r.next(&())); assert_eq!(Some(Duration::ZERO), r.next(&())); assert_eq!(None, r.next(&())); @@ -348,7 +473,7 @@ mod tests { #[test] fn test_spaced() { let d = Duration::from_millis(500); - let mut schedule = recurs(2).intersect(spaced(d)); + let mut schedule = Schedules::recur(2).intersect(Schedules::spaced(d)); assert_eq!(Some(d), schedule.next(&())); assert_eq!(Some(d), schedule.next(&())); assert_eq!(None, schedule.next(&())); @@ -357,8 +482,8 @@ mod tests { #[test] fn test_sequence() { let d = Duration::from_millis(500); - let left = recurs(2).intersect(spaced(d)); - let right = recurs(2); + let left = Schedules::recur(2).intersect(Schedules::spaced(d)); + let right = Schedules::recur(2); let mut schedule = left.concat(right); assert_eq!(Some(d), schedule.next(&())); @@ -370,7 +495,7 @@ mod tests { #[test] fn test_exponential() { - let mut schedule = exponential(Duration::from_millis(500), 2.0).take(6); + let mut schedule = Schedules::exponential(Duration::from_millis(500), 2.0).take(6); assert_eq!(Some(Duration::from_millis(500)), schedule.next(&())); assert_eq!(Some(Duration::from_millis(1000)), schedule.next(&())); @@ -383,7 +508,7 @@ mod tests { #[test] fn test_exponential_while() { - let mut schedule = exponential(Duration::from_millis(500), 2.0) + let mut schedule = Schedules::exponential(Duration::from_millis(500), 2.0) .take_while(|(_, d)| d < Duration::from_millis(2001)); assert_eq!(Some(Duration::from_millis(500)), schedule.next(&())); @@ -394,7 +519,7 @@ mod tests { #[test] fn test_exponential_clamp() { - let mut schedule = exponential(Duration::from_millis(100), 2.0) + let mut schedule = Schedules::exponential(Duration::from_millis(100), 2.0) .clamp_min(Duration::from_secs(1)) .clamp_max(Duration::from_secs(4)); diff --git a/src/use_query.rs b/src/use_query.rs index 30c7a3d..042a004 100644 --- a/src/use_query.rs +++ b/src/use_query.rs @@ -108,22 +108,12 @@ where let resource_fetcher = move |query: Query| { async move { - match query.state.get_untracked() { + match query.state.get_untracked().result() { // Immediately provide cached value. - QueryState::Loaded(data) - | QueryState::Invalid(data) - | QueryState::Fetching(data) - | QueryState::Retrying(Some(data)) - | QueryState::Error(QueryError { - prev_data: Some(data), - .. - }) => ResourceData(Some(data.data)), + Some(data) => ResourceData(Some(data)), // Suspend indefinitely and wait for interruption. - QueryState::Created - | QueryState::Loading - | QueryState::Error(_) - | QueryState::Retrying(None) => { + None => { sleep(LONG_TIME).await; ResourceData(None) } @@ -131,14 +121,19 @@ where } }; - let resource: Resource, ResourceData> = { - let default = options.default_value; + let QueryOptions { + default_value, + retry, + .. + } = options; + + let resource: Resource, ResourceData> = { match options.resource_option { ResourceOption::NonBlocking => create_resource_with_initial_value( cx, move || query.get(), resource_fetcher, - default.map(|default| ResourceData(Some(default))), + default_value.map(|default| ResourceData(Some(Ok(default)))), ), ResourceOption::Blocking => { create_blocking_resource(cx, move || query.get(), resource_fetcher) @@ -152,7 +147,7 @@ where if let QueryState::Loaded(data) = state { // Interrupt Suspense. if resource.loading().get_untracked() { - resource.set(ResourceData(Some(data.data))); + resource.set(ResourceData(Some(Ok(data.data)))); } else { resource.refetch(); } @@ -161,7 +156,7 @@ where let executor = create_executor(query, fetcher); - synchronize_state(cx, query, executor.clone()); + synchronize_state(cx, query, executor.clone(), retry); // Ensure key changes are considered. create_isomorphic_effect(cx, { @@ -191,7 +186,7 @@ where executor() // SSR edge case. // Given hydrate can happen before resource resolves, signals on the client can be out of sync with resource. - } else if let Some(ref data) = read { + } else if let Some(Ok(ref data)) = read { if let QueryState::Created = query.state.get_untracked() { let updated_at = crate::Instant::now(); let data = QueryData { @@ -226,24 +221,31 @@ async fn sleep(duration: Duration) { /// Wrapper type to enable using `Serializable` #[derive(Clone, Debug)] -struct ResourceData(Option); +struct ResourceData(Option>); -impl Serializable for ResourceData +impl Serializable for ResourceData where V: Serializable, + E: Serializable, { fn ser(&self) -> Result { - if let Some(ref value) = self.0 { - value.ser() - } else { - Ok("null".to_string()) + match self.0 { + Some(Ok(ref value)) => value.ser(), + Some(Err(ref error)) => error.ser(), + None => Ok("null".to_string()), } } fn de(bytes: &str) -> Result { match bytes { "" | "null" => Ok(ResourceData(None)), - v => ::de(v).map(Some).map(ResourceData), + bytes => match ::de(bytes) { + Ok(value) => Ok(ResourceData(Some(Ok(value)))), + Err(_) => match ::de(bytes) { + Ok(error) => Ok(ResourceData(Some(Err(error)))), + Err(error) => Err(error), + }, + }, } } } @@ -252,6 +254,24 @@ where #[derive(Clone)] pub enum Never {} +impl Serializable for Never { + fn ser(&self) -> Result { + match *self {} + } + + fn de(_: &str) -> Result { + Err(SerializationError::Deserialize(std::rc::Rc::new( + NeverSerde(), + ))) + } +} + +impl std::fmt::Debug for Never { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self {} + } +} + struct NeverSerde(); impl std::error::Error for NeverSerde { @@ -274,17 +294,3 @@ impl std::fmt::Debug for NeverSerde { f.debug_tuple("NeverSerde").finish() } } - -impl Serializable for Never { - fn ser(&self) -> Result { - Err(SerializationError::Serialize( - std::rc::Rc::new(NeverSerde()), - )) - } - - fn de(bytes: &str) -> Result { - Err(SerializationError::Deserialize(std::rc::Rc::new( - NeverSerde(), - ))) - } -}