From 7732f57cc422fcd27adcd92cef9c8089b439c13f Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 4 May 2023 16:55:40 +0200 Subject: [PATCH 01/15] chore: Rename `SlidingSync::pop_list` to `remove_list` Signed-off-by: Benjamin Bouvier --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 4 ++-- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 +- testing/sliding-sync-integration-test/src/lib.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 0130fff32bf..653af5a1477 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -741,8 +741,8 @@ impl SlidingSync { self.inner.add_list(list.inner.clone()).map(|inner| Arc::new(SlidingSyncList { inner })) } - pub fn pop_list(&self, name: String) -> Option> { - self.inner.pop_list(&name).map(|inner| Arc::new(SlidingSyncList { inner })) + pub fn remove_list(&self, name: String) -> Option> { + self.inner.remove_list(&name).map(|inner| Arc::new(SlidingSyncList { inner })) } pub fn add_common_extensions(&self) { diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 115e134e4e0..0e9413b8892 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -205,7 +205,7 @@ impl SlidingSync { /// Note: Remember that this change will only be applicable for any new /// stream created after this. The old stream will still continue to use the /// previous set of lists. - pub fn pop_list(&self, list_name: &String) -> Option { + pub fn remove_list(&self, list_name: &String) -> Option { self.inner.lists.write().unwrap().remove(list_name) } diff --git a/testing/sliding-sync-integration-test/src/lib.rs b/testing/sliding-sync-integration-test/src/lib.rs index 60cdd925ed8..8ac6bd33c79 100644 --- a/testing/sliding-sync-integration-test/src/lib.rs +++ b/testing/sliding-sync-integration-test/src/lib.rs @@ -443,7 +443,7 @@ async fn live_lists() -> anyhow::Result<()> { // we only heard about the ones we had asked for assert_eq!(summary.lists, [list_name_1, list_name_2, list_name_3]); - let Some(list_2) = sync_proxy.pop_list(&list_name_2.to_owned()) else { + let Some(list_2) = sync_proxy.remove_list(&list_name_2.to_owned()) else { bail!("Room exists"); }; From 893d2daef39cc185eb20a526d3686759bd635d46 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 4 May 2023 16:54:25 +0200 Subject: [PATCH 02/15] feat: Implement caching per sliding sync list Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/builder.rs | 31 ++- crates/matrix-sdk/src/sliding_sync/cache.rs | 192 ++++++++++-------- crates/matrix-sdk/src/sliding_sync/error.rs | 7 + .../src/sliding_sync/list/builder.rs | 65 +++++- .../matrix-sdk/src/sliding_sync/list/mod.rs | 35 ++-- crates/matrix-sdk/src/sliding_sync/mod.rs | 26 +++ 6 files changed, 240 insertions(+), 116 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index f10aabf5352..f64d7c182d0 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -19,7 +19,7 @@ use super::{ cache::restore_sliding_sync_state, SlidingSync, SlidingSyncInner, SlidingSyncList, SlidingSyncPositionMarkers, SlidingSyncRoom, }; -use crate::{Client, Result}; +use crate::{Client, Result, SlidingSyncListBuilder}; /// Configuration for a Sliding Sync instance. /// @@ -34,6 +34,7 @@ pub struct SlidingSyncBuilder { bump_event_types: Vec, extensions: Option, subscriptions: BTreeMap, + rooms: BTreeMap, } impl SlidingSyncBuilder { @@ -46,6 +47,7 @@ impl SlidingSyncBuilder { bump_event_types: Vec::new(), extensions: None, subscriptions: BTreeMap::new(), + rooms: BTreeMap::new(), } } @@ -63,13 +65,32 @@ impl SlidingSyncBuilder { /// Add the given list to the lists. /// - /// Replace any list with the name. + /// Replace any list with the same name. pub fn add_list(mut self, list: SlidingSyncList) -> Self { self.lists.insert(list.name().to_owned(), list); - self } + /// Enroll the list in caching, reloads it from the cache if possible, and adds it to the list + /// of lists. + /// + /// This will raise an error if a [`storage_key()`] was not set, or if there was a I/O error + /// reading from the cache. + /// + /// Replace any list with the same name. + pub async fn add_cached_list(mut self, mut list: SlidingSyncListBuilder) -> Result { + let Some(ref storage_key) = self.storage_key else { + return Err(super::error::Error::MissingStorageKeyForCaching.into()); + }; + let reloaded_rooms = list.set_cached_and_reload(&self.client, storage_key).await?; + for (key, frozen) in reloaded_rooms { + self.rooms + .entry(key) + .or_insert_with(|| SlidingSyncRoom::from_frozen(frozen, self.client.clone())); + } + Ok(self.add_list(list.build())) + } + /// Activate e2ee, to-device-message and account data extensions if not yet /// configured. /// @@ -204,7 +225,6 @@ impl SlidingSyncBuilder { let client = self.client; let mut delta_token = None; - let mut rooms_found: BTreeMap = BTreeMap::new(); // Load an existing state from the cache. if let Some(storage_key) = &self.storage_key { @@ -213,13 +233,12 @@ impl SlidingSyncBuilder { storage_key, &mut self.lists, &mut delta_token, - &mut rooms_found, &mut self.extensions, ) .await?; } - let rooms = StdRwLock::new(rooms_found); + let rooms = StdRwLock::new(self.rooms); let lists = StdRwLock::new(self.lists); Ok(SlidingSync::new(SlidingSyncInner { diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 4a077052b35..6c29042c5ee 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -6,36 +6,43 @@ use std::collections::BTreeMap; -use ruma::{api::client::sync::sync_events::v4::ExtensionsConfig, OwnedRoomId}; +use matrix_sdk_base::{StateStore, StoreError}; +use ruma::api::client::sync::sync_events::v4::ExtensionsConfig; use tracing::{trace, warn}; -use super::{ - FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList, SlidingSyncRoom, -}; -use crate::{Client, Result}; +use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList}; +use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result}; +/// Be careful: as this is used as a storage key; changing it requires migrating data! fn format_storage_key_for_sliding_sync(storage_key: &str) -> String { format!("sliding_sync_store::{storage_key}") } +/// Be careful: as this is used as a storage key; changing it requires migrating data! fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String { format!("sliding_sync_store::{storage_key}::{list_name}") } -/// Clean the storage for everything related to `SlidingSync`. +/// Invalidate a single [`SlidingSyncList`] cache entry by removing it from the cache. +async fn invalidate_cached_list( + storage: &dyn StateStore, + storage_key: &str, + list_name: &str, +) { + let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); + let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await; +} + +/// Clean the storage for everything related to `SlidingSync` and all known lists. async fn clean_storage( client: &Client, storage_key: &str, lists: &BTreeMap, ) { let storage = client.store(); - for list_name in lists.keys() { - let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); - - let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await; + invalidate_cached_list(storage, storage_key, list_name).await; } - let _ = storage .remove_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes()) .await; @@ -57,7 +64,7 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu ) .await?; - // Write every `SlidingSyncList` inside the client the store. + // Write every `SlidingSyncList` that's configured for caching into the store. let frozen_lists = { let rooms_lock = sliding_sync.inner.rooms.read().unwrap(); @@ -67,11 +74,13 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu .read() .unwrap() .iter() - .map(|(list_name, list)| { - Ok(( - format_storage_key_for_sliding_sync_list(storage_key, list_name), - serde_json::to_vec(&FrozenSlidingSyncList::freeze(list, &rooms_lock))?, - )) + .filter_map(|(list_name, list)| { + matches!(list.cache_policy(), SlidingSyncListCachePolicy::Enabled).then(|| { + Ok(( + format_storage_key_for_sliding_sync_list(storage_key, list_name), + serde_json::to_vec(&FrozenSlidingSyncList::freeze(list, &rooms_lock))?, + )) + }) }) .collect::, crate::Error>>()? }; @@ -85,68 +94,64 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu Ok(()) } -/// Restore the `SlidingSync`'s state from what is stored in the storage. +/// Try to restore a single [`SlidingSyncList`] from the cache. /// -/// If one cache is obsolete (corrupted, and cannot be deserialized or -/// anything), the entire `SlidingSync` cache is removed. -pub(super) async fn restore_sliding_sync_state( - client: &Client, +/// If it fails to deserialize for some reason, invalidate the cache entry. +pub(super) async fn restore_sliding_sync_list( + storage: &dyn StateStore, storage_key: &str, - lists: &mut BTreeMap, - delta_token: &mut Option, - rooms_found: &mut BTreeMap, - extensions: &mut Option, -) -> Result<()> { - let storage = client.store(); - - let mut collected_lists_and_frozen_lists = Vec::with_capacity(lists.len()); - - // Preload the `FrozenSlidingSyncList` objects from the cache. - // - // Even if a cache was detected as obsolete, we go over all of them, so that we - // are sure all obsolete cache entries are removed. - for (list_name, list) in lists.iter_mut() { - let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); + list_name: &str, +) -> Result> { + let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name); - match storage - .get_custom_value(storage_key_for_list.as_bytes()) - .await? - .map(|custom_value| serde_json::from_slice::(&custom_value)) - { + match storage + .get_custom_value(storage_key_for_list.as_bytes()) + .await? + .map(|custom_value| serde_json::from_slice::(&custom_value)) + { + Some(Ok(frozen_list)) => { // List has been found and successfully deserialized. - Some(Ok(frozen_list)) => { - trace!(list_name, "successfully read the list from cache"); - - // Keep it for later. - collected_lists_and_frozen_lists.push((list, frozen_list)); - } + trace!(list_name, "successfully read the list from cache"); + return Ok(Some(frozen_list)); + } + Some(Err(_)) => { // List has been found, but it wasn't possible to deserialize it. It's declared // as obsolete. The main reason might be that the internal representation of a // `SlidingSyncList` might have changed. Instead of considering this as a strong // error, we remove the entry from the cache and keep the list in its initial // state. - Some(Err(_)) => { - warn!( + warn!( list_name, "failed to deserialize the list from the cache, it is obsolete; removing the cache entry!" ); + // Let's clear the list and stop here. + invalidate_cached_list(storage, storage_key, list_name).await; + } - // Let's clear everything and stop here. - clean_storage(client, storage_key, lists).await; - - return Ok(()); - } - - None => { - trace!(list_name, "failed to find the list in the cache"); - - // A missing cache doesn't make anything obsolete. - // We just do nothing here. - } + None => { + // A missing cache doesn't make anything obsolete. + // We just do nothing here. + trace!(list_name, "failed to find the list in the cache"); } } + Ok(None) +} + +/// Restore the `SlidingSync`'s state from what is stored in the storage. +/// +/// If one cache is obsolete (corrupted, and cannot be deserialized or +/// anything), the entire `SlidingSync` cache is removed. +pub(super) async fn restore_sliding_sync_state( + client: &Client, + storage_key: &str, + lists: &mut BTreeMap, + delta_token: &mut Option, + extensions: &mut Option, +) -> Result<()> { + let storage = client.store(); + // Preload the `SlidingSync` object from the cache. match storage .get_custom_value(format_storage_key_for_sliding_sync(storage_key).as_bytes()) @@ -156,22 +161,6 @@ pub(super) async fn restore_sliding_sync_state( // `SlidingSync` has been found and successfully deserialized. Some(Ok(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token })) => { trace!("Successfully read the `SlidingSync` from the cache"); - - // OK, at this step, everything has been loaded successfully from the cache. - - // Let's update all the `SlidingSyncList`. - for (list, FrozenSlidingSyncList { maximum_number_of_rooms, room_list, rooms }) in - collected_lists_and_frozen_lists - { - list.set_from_cold(maximum_number_of_rooms, room_list); - - for (key, frozen_room) in rooms.into_iter() { - rooms_found.entry(key).or_insert_with(|| { - SlidingSyncRoom::from_frozen(frozen_room, client.clone()) - }); - } - } - // Let's update the `SlidingSync`. if let Some(since) = to_device_since { let to_device_ext = &mut extensions.get_or_insert_with(Default::default).to_device; @@ -179,7 +168,6 @@ pub(super) async fn restore_sliding_sync_state( to_device_ext.since = Some(since); } } - *delta_token = frozen_delta_token; } @@ -238,28 +226,40 @@ mod tests { .await? .is_none()); + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + // Create a new `SlidingSync` instance, and store it. { let sliding_sync = client .sliding_sync() .await .storage_key(Some("hello".to_owned())) - .add_list(SlidingSyncList::builder("list_foo").build()) + .add_cached_list(SlidingSyncList::builder("list_foo")) + .await? + .add_list(SlidingSyncList::builder("list_bar").build()) .build() .await?; - // Modify one list just to check the restoration. + // Modify both lists, so we can check expected caching behavior later. { let lists = sliding_sync.inner.lists.write().unwrap(); - let list_foo = lists.get("list_foo").unwrap(); + let list_foo = lists.get("list_foo").unwrap(); list_foo.set_maximum_number_of_rooms(Some(42)); + + let list_bar = lists.get("list_bar").unwrap(); + list_bar.set_maximum_number_of_rooms(Some(1337)); } assert!(sliding_sync.cache_to_storage().await.is_ok()); } - // Store entries now exist. + // Store entries now exist for the sliding sync object and list_foo. assert!(store .get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes()) .await? @@ -272,22 +272,37 @@ mod tests { .await? .is_some()); + // But not for list_bar. + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + // Create a new `SlidingSync`, and it should be read from the cache. { let sliding_sync = client .sliding_sync() .await .storage_key(Some("hello".to_owned())) - .add_list(SlidingSyncList::builder("list_foo").build()) + .add_cached_list(SlidingSyncList::builder("list_foo")) + .await? + .add_list(SlidingSyncList::builder("list_bar").build()) .build() .await?; // Check the list' state. { let lists = sliding_sync.inner.lists.write().unwrap(); - let list_foo = lists.get("list_foo").unwrap(); + // This one was cached. + let list_foo = lists.get("list_foo").unwrap(); assert_eq!(list_foo.maximum_number_of_rooms(), Some(42)); + + // This one wasn't. + let list_bar = lists.get("list_bar").unwrap(); + assert_eq!(list_bar.maximum_number_of_rooms(), None); } // Clean the cache. @@ -307,6 +322,13 @@ mod tests { .await? .is_none()); + assert!(store + .get_custom_value( + format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes() + ) + .await? + .is_none()); + Ok(()) }) } diff --git a/crates/matrix-sdk/src/sliding_sync/error.rs b/crates/matrix-sdk/src/sliding_sync/error.rs index 41baaef19a6..d9e6ba48176 100644 --- a/crates/matrix-sdk/src/sliding_sync/error.rs +++ b/crates/matrix-sdk/src/sliding_sync/error.rs @@ -12,15 +12,18 @@ pub enum Error { /// `sync`-restart might be required. #[error("The sliding sync response could not be handled: {0}")] BadResponse(String), + /// A `SlidingSyncListRequestGenerator` has been used without having been /// initialized. It happens when a response is handled before a request has /// been sent. It usually happens when testing. #[error("The sliding sync list `{0}` is handling a response, but its request generator has not been initialized")] RequestGeneratorHasNotBeenInitialized(String), + /// Someone has tried to modify a sliding sync list's ranges, but the /// selected sync mode doesn't allow that. #[error("The chosen sync mode for the list `{0}` doesn't allow to modify the ranges")] CannotModifyRanges(String), + /// Ranges have a `start` bound greater than `end`. #[error("Ranges have invalid bounds: `{start}..{end}`")] InvalidRange { @@ -29,4 +32,8 @@ pub enum Error { /// End bound. end: u32, }, + + /// Missing storage key when asking to deserialize some sub-state of sliding sync. + #[error("A caching request was made but a storage key is missing in sliding sync")] + MissingStorageKeyForCaching, } diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 8d5b9dc970b..aad0393d92e 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -1,17 +1,23 @@ //! Builder for [`SlidingSyncList`]. use std::{ + collections::BTreeMap, fmt::Debug, sync::{Arc, RwLock as StdRwLock}, }; use eyeball::unique::Observable; -use eyeball_im::ObservableVector; -use ruma::{api::client::sync::sync_events::v4, events::StateEventType, UInt}; +use eyeball_im::{ObservableVector, Vector}; +use ruma::{api::client::sync::sync_events::v4, events::StateEventType, OwnedRoomId, UInt}; + +use crate::{ + sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, + Client, RoomListEntry, +}; use super::{ - SlidingSyncList, SlidingSyncListInner, SlidingSyncListRequestGenerator, SlidingSyncMode, - SlidingSyncState, + SlidingSyncList, SlidingSyncListCachePolicy, SlidingSyncListInner, + SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState, }; /// The default name for the full sync list. @@ -29,6 +35,21 @@ pub struct SlidingSyncListBuilder { timeline_limit: Option, name: String, ranges: Vec<(UInt, UInt)>, + + /// Should this list be cached and reloaded from the cache? + cache_policy: SlidingSyncListCachePolicy, + + /// Total number of rooms that is possible to interact with the given list. + /// See also comment of [`SlidingSyncList::maximum_number_of_rooms`]. + /// May be reloaded from the cache. + maximum_number_of_rooms: Option, + + /// List of room entries. + /// May be reloaded from the cache. + room_list: Vector, + + /// State of this list, indicating whether it was preloaded from the cache or not. + state: SlidingSyncState, } impl SlidingSyncListBuilder { @@ -46,6 +67,10 @@ impl SlidingSyncListBuilder { timeline_limit: None, name: name.into(), ranges: Vec::new(), + cache_policy: SlidingSyncListCachePolicy::Disabled, + maximum_number_of_rooms: None, + room_list: Vector::new(), + state: SlidingSyncState::default(), } } @@ -132,6 +157,26 @@ impl SlidingSyncListBuilder { self } + /// Marks this list as sync'd from the cache, and attempts to reload it from storage. + pub(in super::super) async fn set_cached_and_reload( + &mut self, + client: &Client, + storage_key: &str, + ) -> crate::Result> { + self.cache_policy = SlidingSyncListCachePolicy::Enabled; + if let Some(frozen_list) = + restore_sliding_sync_list(client.store(), storage_key, &self.name).await? + { + self.maximum_number_of_rooms = frozen_list.maximum_number_of_rooms; + assert!(self.room_list.is_empty(), "can't call `set_cached_and_reload` twice"); + self.room_list = frozen_list.room_list; + self.state = SlidingSyncState::Preloaded; + Ok(frozen_list.rooms) + } else { + Ok(Default::default()) + } + } + /// Build the list. pub fn build(self) -> SlidingSyncList { let request_generator = match &self.sync_mode { @@ -158,14 +203,18 @@ impl SlidingSyncListBuilder { timeline_limit: StdRwLock::new(Observable::new(self.timeline_limit)), name: self.name, ranges: StdRwLock::new(Observable::new(self.ranges)), + cache_policy: self.cache_policy, // Computed from the builder. request_generator: StdRwLock::new(request_generator), - // Default values for the type we are building. - state: StdRwLock::new(Observable::new(SlidingSyncState::default())), - maximum_number_of_rooms: StdRwLock::new(Observable::new(None)), - room_list: StdRwLock::new(ObservableVector::new()), + // Values read from deserialization, or that are still equal to the default values + // otherwise. + state: StdRwLock::new(Observable::new(self.state)), + maximum_number_of_rooms: StdRwLock::new(Observable::new( + self.maximum_number_of_rooms, + )), + room_list: StdRwLock::new(ObservableVector::from(self.room_list)), }), } } diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index 1221a53606e..a100e7da464 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -17,7 +17,6 @@ use eyeball::unique::Observable; use eyeball_im::{ObservableVector, VectorDiff}; pub(super) use frozen::FrozenSlidingSyncList; use futures_core::Stream; -use imbl::Vector; pub(super) use request_generator::*; pub use room_list_entry::RoomListEntry; use ruma::{api::client::sync::sync_events::v4, assign, events::StateEventType, OwnedRoomId, UInt}; @@ -27,6 +26,16 @@ use tracing::{instrument, warn}; use super::{Error, FrozenSlidingSyncRoom, SlidingSyncRoom}; use crate::Result; +/// Should this [`SlidingSyncList`] be stored in the cache, and automatically reloaded from the +/// cache upon creation? +#[derive(Clone, Copy, Debug)] +pub(crate) enum SlidingSyncListCachePolicy { + /// Store and load this list from the cache. + Enabled, + /// Don't store and load this list from the cache. + Disabled, +} + /// Holding a specific filtered list within the concept of sliding sync. /// /// It is OK to clone this type as much as you need: cloning it is cheap. @@ -41,22 +50,6 @@ impl SlidingSyncList { SlidingSyncListBuilder::new(name) } - pub(crate) fn set_from_cold( - &mut self, - maximum_number_of_rooms: Option, - room_list: Vector, - ) { - Observable::set(&mut self.inner.state.write().unwrap(), SlidingSyncState::Preloaded); - Observable::set( - &mut self.inner.maximum_number_of_rooms.write().unwrap(), - maximum_number_of_rooms, - ); - - let mut lock = self.inner.room_list.write().unwrap(); - lock.clear(); - lock.append(room_list); - } - /// Get the name of the list. pub fn name(&self) -> &str { self.inner.name.as_str() @@ -188,6 +181,11 @@ impl SlidingSyncList { self.inner.next_request() } + /// Returns the current cache policy for this list. + pub(super) fn cache_policy(&self) -> SlidingSyncListCachePolicy { + self.inner.cache_policy + } + /// Update the list based on the response from the server. /// /// The `maximum_number_of_rooms` is the `lists.$this_list.count` value, @@ -275,6 +273,9 @@ pub(super) struct SlidingSyncListInner { /// The request generator, i.e. a type that yields the appropriate list /// request. See [`SlidingSyncListRequestGenerator`] to learn more. request_generator: StdRwLock, + + /// Cache policy for this list. + cache_policy: SlidingSyncListCachePolicy, } impl SlidingSyncListInner { diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 0e9413b8892..5be20ab7ddd 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -222,6 +222,32 @@ impl SlidingSync { self.inner.lists.write().unwrap().insert(list.name().to_owned(), list) } + /// Add a list that will be cached and reloaded from the cache. + /// + /// This will raise an error if a [`storage_key()`] was not set, or if there was a I/O error + /// reading from the cache. + /// + /// The rest of the semantics is the same as [`Self::add_list`]. + pub async fn add_cached_list( + &self, + mut list: SlidingSyncListBuilder, + ) -> Result> { + let Some(ref storage_key) = self.inner.storage_key else { + return Err(error::Error::MissingStorageKeyForCaching.into()); + }; + let reloaded_rooms = list.set_cached_and_reload(&self.inner.client, storage_key).await?; + if !reloaded_rooms.is_empty() { + let mut rooms = self.inner.rooms.write().unwrap(); + for (key, frozen) in reloaded_rooms { + rooms.entry(key).or_insert_with(|| { + SlidingSyncRoom::from_frozen(frozen, self.inner.client.clone()) + }); + } + } + let list = list.build(); + Ok(self.add_list(list)) + } + /// Lookup a set of rooms pub fn get_rooms>( &self, From ee6fd1bd7483cbeafff144a8401246c6feadc084 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 5 May 2023 17:12:22 +0200 Subject: [PATCH 03/15] test: usage of add_cached_list without a storage key is a hard error Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/cache.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 6c29042c5ee..62820cc742d 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -204,6 +204,27 @@ mod tests { use super::*; use crate::{Client, Result}; + #[test] + fn test_cant_cache_without_a_storage_key() -> Result<()> { + block_on(async { + let homeserver = Url::parse("https://foo.bar")?; + let client = Client::new(homeserver).await?; + let err = client + .sliding_sync() + .await + .add_cached_list(SlidingSyncList::builder("list_foo")) + .await + .unwrap_err(); + assert!(matches!( + err, + crate::Error::SlidingSync( + crate::sliding_sync::error::Error::MissingStorageKeyForCaching + ) + )); + Ok(()) + }) + } + #[allow(clippy::await_holding_lock)] #[test] fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> { From 4762776b047fbba73fbd9c39c1e57f9e385f69f6 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 5 May 2023 17:22:11 +0200 Subject: [PATCH 04/15] ffi: add bindings for `SlidingSync::add_cached_list` and `SlidingSyncBuilder::add_cached_list` Signed-off-by: Benjamin Bouvier --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 24 +++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 653af5a1477..e0974cd993b 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -741,6 +741,19 @@ impl SlidingSync { self.inner.add_list(list.inner.clone()).map(|inner| Arc::new(SlidingSyncList { inner })) } + pub fn add_cached_list( + &self, + list_builder: Arc, + ) -> Result>, ClientError> { + RUNTIME.block_on(async move { + Ok(self + .inner + .add_cached_list(list_builder.inner.clone()) + .await? + .map(|inner| Arc::new(SlidingSyncList { inner }))) + }) + } + pub fn remove_list(&self, name: String) -> Option> { self.inner.remove_list(&name).map(|inner| Arc::new(SlidingSyncList { inner })) } @@ -816,6 +829,17 @@ impl SlidingSyncBuilder { Arc::new(builder) } + pub fn add_cached_list( + self: Arc, + v: Arc, + ) -> Result, ClientError> { + let mut builder = unwrap_or_clone_arc(self); + let list_builder = unwrap_or_clone_arc(v); + builder.inner = RUNTIME + .block_on(async move { builder.inner.add_cached_list(list_builder.inner).await })?; + Ok(Arc::new(builder)) + } + pub fn with_common_extensions(self: Arc) -> Arc { let mut builder = unwrap_or_clone_arc(self); builder.inner = builder.inner.with_common_extensions(); From 11dfe40a6549de968c27f2b2025cfb1bb0195415 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 5 May 2023 17:42:52 +0200 Subject: [PATCH 05/15] chore: *nightly* fmt Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/builder.rs | 8 ++++---- crates/matrix-sdk/src/sliding_sync/cache.rs | 12 ++++++++---- crates/matrix-sdk/src/sliding_sync/error.rs | 3 ++- .../matrix-sdk/src/sliding_sync/list/builder.rs | 15 ++++++++------- crates/matrix-sdk/src/sliding_sync/list/mod.rs | 4 ++-- crates/matrix-sdk/src/sliding_sync/mod.rs | 4 ++-- 6 files changed, 26 insertions(+), 20 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index f64d7c182d0..7549fbb2b2e 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -71,11 +71,11 @@ impl SlidingSyncBuilder { self } - /// Enroll the list in caching, reloads it from the cache if possible, and adds it to the list - /// of lists. + /// Enroll the list in caching, reloads it from the cache if possible, and + /// adds it to the list of lists. /// - /// This will raise an error if a [`storage_key()`] was not set, or if there was a I/O error - /// reading from the cache. + /// This will raise an error if a [`storage_key()`] was not set, or if there + /// was a I/O error reading from the cache. /// /// Replace any list with the same name. pub async fn add_cached_list(mut self, mut list: SlidingSyncListBuilder) -> Result { diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 62820cc742d..696ec5f0e85 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -13,17 +13,20 @@ use tracing::{trace, warn}; use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList}; use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result}; -/// Be careful: as this is used as a storage key; changing it requires migrating data! +/// Be careful: as this is used as a storage key; changing it requires migrating +/// data! fn format_storage_key_for_sliding_sync(storage_key: &str) -> String { format!("sliding_sync_store::{storage_key}") } -/// Be careful: as this is used as a storage key; changing it requires migrating data! +/// Be careful: as this is used as a storage key; changing it requires migrating +/// data! fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String { format!("sliding_sync_store::{storage_key}::{list_name}") } -/// Invalidate a single [`SlidingSyncList`] cache entry by removing it from the cache. +/// Invalidate a single [`SlidingSyncList`] cache entry by removing it from the +/// cache. async fn invalidate_cached_list( storage: &dyn StateStore, storage_key: &str, @@ -33,7 +36,8 @@ async fn invalidate_cached_list( let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await; } -/// Clean the storage for everything related to `SlidingSync` and all known lists. +/// Clean the storage for everything related to `SlidingSync` and all known +/// lists. async fn clean_storage( client: &Client, storage_key: &str, diff --git a/crates/matrix-sdk/src/sliding_sync/error.rs b/crates/matrix-sdk/src/sliding_sync/error.rs index d9e6ba48176..582b52e056f 100644 --- a/crates/matrix-sdk/src/sliding_sync/error.rs +++ b/crates/matrix-sdk/src/sliding_sync/error.rs @@ -33,7 +33,8 @@ pub enum Error { end: u32, }, - /// Missing storage key when asking to deserialize some sub-state of sliding sync. + /// Missing storage key when asking to deserialize some sub-state of sliding + /// sync. #[error("A caching request was made but a storage key is missing in sliding sync")] MissingStorageKeyForCaching, } diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index aad0393d92e..9f354484d9e 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -10,15 +10,14 @@ use eyeball::unique::Observable; use eyeball_im::{ObservableVector, Vector}; use ruma::{api::client::sync::sync_events::v4, events::StateEventType, OwnedRoomId, UInt}; -use crate::{ - sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, - Client, RoomListEntry, -}; - use super::{ SlidingSyncList, SlidingSyncListCachePolicy, SlidingSyncListInner, SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState, }; +use crate::{ + sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, + Client, RoomListEntry, +}; /// The default name for the full sync list. pub const FULL_SYNC_LIST_NAME: &str = "full-sync"; @@ -48,7 +47,8 @@ pub struct SlidingSyncListBuilder { /// May be reloaded from the cache. room_list: Vector, - /// State of this list, indicating whether it was preloaded from the cache or not. + /// State of this list, indicating whether it was preloaded from the cache + /// or not. state: SlidingSyncState, } @@ -157,7 +157,8 @@ impl SlidingSyncListBuilder { self } - /// Marks this list as sync'd from the cache, and attempts to reload it from storage. + /// Marks this list as sync'd from the cache, and attempts to reload it from + /// storage. pub(in super::super) async fn set_cached_and_reload( &mut self, client: &Client, diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index a100e7da464..18989148d10 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -26,8 +26,8 @@ use tracing::{instrument, warn}; use super::{Error, FrozenSlidingSyncRoom, SlidingSyncRoom}; use crate::Result; -/// Should this [`SlidingSyncList`] be stored in the cache, and automatically reloaded from the -/// cache upon creation? +/// Should this [`SlidingSyncList`] be stored in the cache, and automatically +/// reloaded from the cache upon creation? #[derive(Clone, Copy, Debug)] pub(crate) enum SlidingSyncListCachePolicy { /// Store and load this list from the cache. diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 5be20ab7ddd..94256e4c856 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -224,8 +224,8 @@ impl SlidingSync { /// Add a list that will be cached and reloaded from the cache. /// - /// This will raise an error if a [`storage_key()`] was not set, or if there was a I/O error - /// reading from the cache. + /// This will raise an error if a [`storage_key()`] was not set, or if there + /// was a I/O error reading from the cache. /// /// The rest of the semantics is the same as [`Self::add_list`]. pub async fn add_cached_list( From 83e79c584c39d1563d8f5c87fb29f28f8411a227 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 9 May 2023 15:17:12 +0200 Subject: [PATCH 06/15] review feedback Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/builder.rs | 3 + crates/matrix-sdk/src/sliding_sync/cache.rs | 2 +- .../src/sliding_sync/list/builder.rs | 67 ++++++++++++------- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index c2e7f36ee86..b92bec08e91 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -83,12 +83,15 @@ impl SlidingSyncBuilder { let Some(ref storage_key) = self.storage_key else { return Err(super::error::Error::MissingStorageKeyForCaching.into()); }; + let reloaded_rooms = list.set_cached_and_reload(&self.client, storage_key).await?; + for (key, frozen) in reloaded_rooms { self.rooms .entry(key) .or_insert_with(|| SlidingSyncRoom::from_frozen(frozen, self.client.clone())); } + Ok(self.add_list(list)) } diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 64094c5c22f..08348d1ee76 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -209,7 +209,7 @@ mod tests { use crate::{Client, Result}; #[test] - fn test_cant_cache_without_a_storage_key() -> Result<()> { + fn test_cannot_cache_without_a_storage_key() -> Result<()> { block_on(async { let homeserver = Url::parse("https://foo.bar")?; let client = Client::new(homeserver).await?; diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 1abeb270bf9..a37a400a308 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -26,6 +26,19 @@ use super::{ /// The default name for the full sync list. pub const FULL_SYNC_LIST_NAME: &str = "full-sync"; +/// Data that might have been read from the cache. +#[derive(Clone)] +struct SlidingSyncCachedData { + /// Total number of rooms that is possible to interact with the given list. + /// See also comment of [`SlidingSyncList::maximum_number_of_rooms`]. + /// May be reloaded from the cache. + maximum_number_of_rooms: Option, + + /// List of room entries. + /// May be reloaded from the cache. + room_list: Vector, +} + /// Builder for [`SlidingSyncList`]. #[derive(Clone)] pub struct SlidingSyncListBuilder { @@ -42,18 +55,9 @@ pub struct SlidingSyncListBuilder { /// Should this list be cached and reloaded from the cache? cache_policy: SlidingSyncListCachePolicy, - /// Total number of rooms that is possible to interact with the given list. - /// See also comment of [`SlidingSyncList::maximum_number_of_rooms`]. - /// May be reloaded from the cache. - maximum_number_of_rooms: Option, - - /// List of room entries. - /// May be reloaded from the cache. - room_list: Vector, - - /// State of this list, indicating whether it was preloaded from the cache - /// or not. - state: SlidingSyncState, + /// If set, temporary data that's been read from the cache, reloaded from a + /// `FrozenSlidingSyncList`. + reloaded_cached_data: Option, once_built: Arc SlidingSyncList + Send + Sync>>, } @@ -94,10 +98,8 @@ impl SlidingSyncListBuilder { timeline_limit: None, name: name.into(), ranges: Vec::new(), + reloaded_cached_data: None, cache_policy: SlidingSyncListCachePolicy::Disabled, - maximum_number_of_rooms: None, - room_list: Vector::new(), - state: SlidingSyncState::default(), once_built: Arc::new(Box::new(identity)), } } @@ -196,6 +198,9 @@ impl SlidingSyncListBuilder { /// Marks this list as sync'd from the cache, and attempts to reload it from /// storage. + /// + /// Returns a mapping of the room's data read from the cache, to be incorporated into the + /// `SlidingSync` bookkeepping. pub(in super::super) async fn set_cached_and_reload( &mut self, client: &Client, @@ -205,10 +210,14 @@ impl SlidingSyncListBuilder { if let Some(frozen_list) = restore_sliding_sync_list(client.store(), storage_key, &self.name).await? { - self.maximum_number_of_rooms = frozen_list.maximum_number_of_rooms; - assert!(self.room_list.is_empty(), "can't call `set_cached_and_reload` twice"); - self.room_list = frozen_list.room_list; - self.state = SlidingSyncState::Preloaded; + assert!( + self.reloaded_cached_data.is_none(), + "can't call `set_cached_and_reload` twice" + ); + self.reloaded_cached_data = Some(SlidingSyncCachedData { + maximum_number_of_rooms: frozen_list.maximum_number_of_rooms, + room_list: frozen_list.room_list, + }); Ok(frozen_list.rooms) } else { Ok(Default::default()) @@ -234,6 +243,18 @@ impl SlidingSyncListBuilder { SlidingSyncMode::Selective => SlidingSyncListRequestGenerator::new_selective(), }; + // Acknowledge data that's been reloaded from the cache, or use default values. + let (state, maximum_number_of_rooms, room_list) = + if let Some(cached_data) = self.reloaded_cached_data { + ( + SlidingSyncState::Preloaded, + cached_data.maximum_number_of_rooms, + cached_data.room_list, + ) + } else { + (SlidingSyncState::default(), Default::default(), Default::default()) + }; + let list = SlidingSyncList { inner: Arc::new(SlidingSyncListInner { // From the builder @@ -251,11 +272,9 @@ impl SlidingSyncListBuilder { // Values read from deserialization, or that are still equal to the default values // otherwise. - state: StdRwLock::new(Observable::new(self.state)), - maximum_number_of_rooms: StdRwLock::new(Observable::new( - self.maximum_number_of_rooms, - )), - room_list: StdRwLock::new(ObservableVector::from(self.room_list)), + state: StdRwLock::new(Observable::new(state)), + maximum_number_of_rooms: StdRwLock::new(Observable::new(maximum_number_of_rooms)), + room_list: StdRwLock::new(ObservableVector::from(room_list)), sliding_sync_internal_channel_sender, }), From c30a47c59712fe72f38e5ae5e9e26dd304e577a9 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 9 May 2023 15:27:37 +0200 Subject: [PATCH 07/15] chore: Remove unnecessary &mut Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 +- crates/matrix-sdk/src/sliding_sync/cache.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index b92bec08e91..05bb32451be 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -245,7 +245,7 @@ impl SlidingSyncBuilder { restore_sliding_sync_state( &client, storage_key, - &mut lists, + &lists, &mut delta_token, &mut self.extensions, ) diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 08348d1ee76..49230276829 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -150,7 +150,7 @@ pub(super) async fn restore_sliding_sync_list( pub(super) async fn restore_sliding_sync_state( client: &Client, storage_key: &str, - lists: &mut BTreeMap, + lists: &BTreeMap, delta_token: &mut Option, extensions: &mut Option, ) -> Result<()> { From d72f658877ae9df9696a89ef70e2075f9e838209 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 9 May 2023 15:42:20 +0200 Subject: [PATCH 08/15] nit: use `BTreeMap::values()` instead of `iter()` and ignore the key Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 4e7855a3691..a0f810d2a6e 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -657,7 +657,7 @@ impl SlidingSync { pub fn reset_lists(&self) -> Result<(), Error> { let lists = self.inner.lists.read().unwrap(); - for (_, list) in lists.iter() { + for list in lists.values() { list.reset()?; } From 46c05c7f4879b94eee54a7eee8b624c4361aad4d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 9 May 2023 15:45:48 +0200 Subject: [PATCH 09/15] fmt nightly :( Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/list/builder.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index a37a400a308..cd724aa14f2 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -13,15 +13,14 @@ use imbl::Vector; use ruma::{api::client::sync::sync_events::v4, events::StateEventType, OwnedRoomId, UInt}; use tokio::sync::mpsc::Sender; -use crate::{ - sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, - Client, RoomListEntry, -}; - use super::{ super::SlidingSyncInternalMessage, SlidingSyncList, SlidingSyncListCachePolicy, SlidingSyncListInner, SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState, }; +use crate::{ + sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, + Client, RoomListEntry, +}; /// The default name for the full sync list. pub const FULL_SYNC_LIST_NAME: &str = "full-sync"; @@ -199,8 +198,8 @@ impl SlidingSyncListBuilder { /// Marks this list as sync'd from the cache, and attempts to reload it from /// storage. /// - /// Returns a mapping of the room's data read from the cache, to be incorporated into the - /// `SlidingSync` bookkeepping. + /// Returns a mapping of the room's data read from the cache, to be + /// incorporated into the `SlidingSync` bookkeepping. pub(in super::super) async fn set_cached_and_reload( &mut self, client: &Client, From a5bf2c2c1fe4a2c644c3b5891ce2e0192256c00c Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 12:21:08 +0200 Subject: [PATCH 10/15] fmt + doc Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/builder.rs | 4 ++-- crates/matrix-sdk/src/sliding_sync/list/builder.rs | 9 ++++----- crates/matrix-sdk/src/sliding_sync/mod.rs | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 05bb32451be..9d27c3b831b 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -75,8 +75,8 @@ impl SlidingSyncBuilder { /// Enroll the list in caching, reloads it from the cache if possible, and /// adds it to the list of lists. /// - /// This will raise an error if a [`storage_key()`] was not set, or if there - /// was a I/O error reading from the cache. + /// This will raise an error if a [`Self::storage_key()`] was not set, or if + /// there was a I/O error reading from the cache. /// /// Replace any list with the same name. pub async fn add_cached_list(mut self, mut list: SlidingSyncListBuilder) -> Result { diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 4b52e1d7738..babdfef44a5 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -14,15 +14,14 @@ use imbl::Vector; use ruma::{api::client::sync::sync_events::v4, events::StateEventType, OwnedRoomId}; use tokio::sync::mpsc::Sender; -use crate::{ - sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, - Client, RoomListEntry, -}; - use super::{ super::SlidingSyncInternalMessage, Bound, SlidingSyncList, SlidingSyncListCachePolicy, SlidingSyncListInner, SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState, }; +use crate::{ + sliding_sync::{cache::restore_sliding_sync_list, FrozenSlidingSyncRoom}, + Client, RoomListEntry, +}; /// The default name for the full sync list. pub const FULL_SYNC_LIST_NAME: &str = "full-sync"; diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index d0320bed140..bfc46613fc1 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -237,7 +237,7 @@ impl SlidingSync { /// Add a list that will be cached and reloaded from the cache. /// - /// This will raise an error if a [`storage_key()`] was not set, or if there + /// This will raise an error if a storage key was not set, or if there /// was a I/O error reading from the cache. /// /// The rest of the semantics is the same as [`Self::add_list`]. From a13efac1054a6fbfcf4069ac340659d1bd8af95d Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 12:35:02 +0200 Subject: [PATCH 11/15] Update crates/matrix-sdk/src/sliding_sync/builder.rs Co-authored-by: Jonas Platte --- crates/matrix-sdk/src/sliding_sync/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 9d27c3b831b..5a17dce01d3 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -75,7 +75,7 @@ impl SlidingSyncBuilder { /// Enroll the list in caching, reloads it from the cache if possible, and /// adds it to the list of lists. /// - /// This will raise an error if a [`Self::storage_key()`] was not set, or if + /// This will raise an error if a [`storage_key()`][Self::storage_key] was not set, or if /// there was a I/O error reading from the cache. /// /// Replace any list with the same name. From 7795793c3825992ce9fa8191cffe0f9931c808c6 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 12:38:17 +0200 Subject: [PATCH 12/15] Fix formatting from code suggestion Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/builder.rs b/crates/matrix-sdk/src/sliding_sync/builder.rs index 5a17dce01d3..5ec61b96ffd 100644 --- a/crates/matrix-sdk/src/sliding_sync/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/builder.rs @@ -75,8 +75,8 @@ impl SlidingSyncBuilder { /// Enroll the list in caching, reloads it from the cache if possible, and /// adds it to the list of lists. /// - /// This will raise an error if a [`storage_key()`][Self::storage_key] was not set, or if - /// there was a I/O error reading from the cache. + /// This will raise an error if a [`storage_key()`][Self::storage_key] was + /// not set, or if there was a I/O error reading from the cache. /// /// Replace any list with the same name. pub async fn add_cached_list(mut self, mut list: SlidingSyncListBuilder) -> Result { From b52e5d0ed8cc71ea513603bd2ea808e8465c7b4f Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 13:07:04 +0200 Subject: [PATCH 13/15] fix: Dispatch values reloaded from the cache into the streams too. Signed-off-by: Benjamin Bouvier --- bindings/matrix-sdk-ffi/src/sliding_sync.rs | 4 +- .../src/sliding_sync/list/builder.rs | 52 ++++++++++++------- .../matrix-sdk/src/sliding_sync/list/mod.rs | 20 ++++++- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 674c5152319..0e1a9d1e774 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -830,10 +830,10 @@ impl SlidingSyncBuilder { pub fn add_cached_list( self: Arc, - v: Arc, + list_builder: Arc, ) -> Result, ClientError> { let mut builder = unwrap_or_clone_arc(self); - let list_builder = unwrap_or_clone_arc(v); + let list_builder = unwrap_or_clone_arc(list_builder); builder.inner = RUNTIME .block_on(async move { builder.inner.add_cached_list(list_builder.inner).await })?; Ok(Arc::new(builder)) diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index babdfef44a5..f50c21b7e11 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -28,7 +28,7 @@ pub const FULL_SYNC_LIST_NAME: &str = "full-sync"; /// Data that might have been read from the cache. #[derive(Clone)] -struct SlidingSyncCachedData { +struct SlidingSyncListCachedData { /// Total number of rooms that is possible to interact with the given list. /// See also comment of [`SlidingSyncList::maximum_number_of_rooms`]. /// May be reloaded from the cache. @@ -57,7 +57,7 @@ pub struct SlidingSyncListBuilder { /// If set, temporary data that's been read from the cache, reloaded from a /// `FrozenSlidingSyncList`. - reloaded_cached_data: Option, + reloaded_cached_data: Option, once_built: Arc SlidingSyncList + Send + Sync>>, } @@ -214,7 +214,7 @@ impl SlidingSyncListBuilder { self.reloaded_cached_data.is_none(), "can't call `set_cached_and_reload` twice" ); - self.reloaded_cached_data = Some(SlidingSyncCachedData { + self.reloaded_cached_data = Some(SlidingSyncListCachedData { maximum_number_of_rooms: frozen_list.maximum_number_of_rooms, room_list: frozen_list.room_list, }); @@ -243,18 +243,6 @@ impl SlidingSyncListBuilder { SlidingSyncMode::Selective => SlidingSyncListRequestGenerator::new_selective(), }; - // Acknowledge data that's been reloaded from the cache, or use default values. - let (state, maximum_number_of_rooms, room_list) = - if let Some(cached_data) = self.reloaded_cached_data { - ( - SlidingSyncState::Preloaded, - cached_data.maximum_number_of_rooms, - cached_data.room_list, - ) - } else { - (SlidingSyncState::default(), Default::default(), Default::default()) - }; - let list = SlidingSyncList { inner: Arc::new(SlidingSyncListInner { // From the builder @@ -272,9 +260,9 @@ impl SlidingSyncListBuilder { // Values read from deserialization, or that are still equal to the default values // otherwise. - state: StdRwLock::new(Observable::new(state)), - maximum_number_of_rooms: StdRwLock::new(Observable::new(maximum_number_of_rooms)), - room_list: StdRwLock::new(ObservableVector::from(room_list)), + state: StdRwLock::new(Observable::new(Default::default())), + maximum_number_of_rooms: StdRwLock::new(Observable::new(None)), + room_list: StdRwLock::new(ObservableVector::from(Vector::new())), sliding_sync_internal_channel_sender, }), @@ -282,6 +270,32 @@ impl SlidingSyncListBuilder { let once_built = self.once_built; - once_built(list) + let list = once_built(list); + + // If we reloaded from the cache, update values in the list here. + // + // Note about ordering: because of the contract with the observables, the + // initial values, if filled, have to be observable in the `once_built` + // callback. That's why we're doing this here *after* constructing the + // list, and not a few lines above. + + if let Some(SlidingSyncListCachedData { maximum_number_of_rooms, room_list }) = + self.reloaded_cached_data + { + // Mark state as preloaded. + Observable::set(&mut list.inner.state.write().unwrap(), SlidingSyncState::Preloaded); + + // Reload values. + Observable::set( + &mut list.inner.maximum_number_of_rooms.write().unwrap(), + maximum_number_of_rooms, + ); + + let mut prev_room_list = list.inner.room_list.write().unwrap(); + assert!(prev_room_list.is_empty(), "room list was empty on creation above!"); + prev_room_list.append(room_list); + } + + list } } diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index c1bb68624a0..28f8be16694 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -118,7 +118,13 @@ impl SlidingSyncList { self.inner.state.read().unwrap().clone() } - /// Get a stream of state. + /// Get a stream of state updates. + /// + /// If this list has been reloaded from a cache, the initial value read from + /// the cache will be published. + /// + /// There's no guarantee of ordering between items emitted by this stream + /// and those emitted by other streams exposed on this structure. pub fn state_stream(&self) -> impl Stream { Observable::subscribe(&self.inner.state.read().unwrap()) } @@ -142,6 +148,12 @@ impl SlidingSyncList { } /// Get a stream of room list. + /// + /// If this list has been reloaded from a cache, the initial value read from + /// the cache will be published. + /// + /// There's no guarantee of ordering between items emitted by this stream + /// and those emitted by other streams exposed on this structure. pub fn room_list_stream(&self) -> impl Stream> { ObservableVector::subscribe(&self.inner.room_list.read().unwrap()) } @@ -153,6 +165,12 @@ impl SlidingSyncList { } /// Get a stream of rooms count. + /// + /// If this list has been reloaded from a cache, the initial value is + /// published too. + /// + /// There's no guarantee of ordering between items emitted by this stream + /// and those emitted by other streams exposed on this structure. pub fn maximum_number_of_rooms_stream(&self) -> impl Stream> { Observable::subscribe(&self.inner.maximum_number_of_rooms.read().unwrap()) } From 5c8ad02d8cb98fef882e9654674d8818bb388ff9 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 15:33:22 +0200 Subject: [PATCH 14/15] test: sliding sync list fields reloaded from the cache are observable in streams Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/cache.rs | 27 ++++++++++++++++++- .../src/sliding_sync/list/builder.rs | 6 ++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 49230276829..b5cf5fbbe95 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -202,7 +202,10 @@ pub(super) async fn restore_sliding_sync_state( #[cfg(test)] mod tests { + use std::sync::{Arc, RwLock}; + use futures::executor::block_on; + use futures_util::StreamExt; use url::Url; use super::*; @@ -307,11 +310,20 @@ mod tests { // Create a new `SlidingSync`, and it should be read from the cache. { + let max_number_of_room_stream = Arc::new(RwLock::new(None)); + let cloned_stream = max_number_of_room_stream.clone(); let sliding_sync = client .sliding_sync() .await .storage_key(Some("hello".to_owned())) - .add_cached_list(SlidingSyncList::builder("list_foo")) + .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| { + // In the `once_built()` handler, nothing has been read from the cache yet. + assert_eq!(list.maximum_number_of_rooms(), None); + + let mut stream = cloned_stream.write().unwrap(); + *stream = Some(list.maximum_number_of_rooms_stream()); + list + })) .await? .add_list(SlidingSyncList::builder("list_bar")) .build() @@ -330,6 +342,19 @@ mod tests { assert_eq!(list_bar.maximum_number_of_rooms(), None); } + // The maximum number of rooms reloaded from the cache should have been + // published. + { + let mut stream = max_number_of_room_stream + .write() + .unwrap() + .take() + .expect("stream must be set"); + let initial_max_number_of_rooms = + stream.next().await.expect("stream must have emitted something"); + assert_eq!(initial_max_number_of_rooms, Some(42)); + } + // Clean the cache. clean_storage(&client, "hello", &sliding_sync.inner.lists.read().unwrap()).await; } diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index f50c21b7e11..8ff49022962 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -104,7 +104,11 @@ impl SlidingSyncListBuilder { } } - /// foo + /// Runs a callback once the list has been built. + /// + /// If the list was cached, then the cached fields won't be available in + /// this callback. Use the streams to get published versions of the + /// cached fields, once they've been set. pub fn once_built(mut self, callback: C) -> Self where C: Fn(SlidingSyncList) -> SlidingSyncList + Send + Sync + 'static, From fd3c45672e970ca23b2395dd316f47832e1bf020 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 15 May 2023 17:59:24 +0200 Subject: [PATCH 15/15] merge conflict oh well Signed-off-by: Benjamin Bouvier --- crates/matrix-sdk/src/sliding_sync/cache.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/crates/matrix-sdk/src/sliding_sync/cache.rs b/crates/matrix-sdk/src/sliding_sync/cache.rs index 9b04aee0ec1..b5cf5fbbe95 100644 --- a/crates/matrix-sdk/src/sliding_sync/cache.rs +++ b/crates/matrix-sdk/src/sliding_sync/cache.rs @@ -355,19 +355,6 @@ mod tests { assert_eq!(initial_max_number_of_rooms, Some(42)); } - // The maximum number of rooms reloaded from the cache should have been - // published. - { - let mut stream = max_number_of_room_stream - .write() - .unwrap() - .take() - .expect("stream must be set"); - let initial_max_number_of_rooms = - stream.next().await.expect("stream must have emitted something"); - assert_eq!(initial_max_number_of_rooms, Some(42)); - } - // Clean the cache. clean_storage(&client, "hello", &sliding_sync.inner.lists.read().unwrap()).await; }