Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup on https://github.com/matrix-org/matrix-rust-sdk/pull/3995 #2

Open
wants to merge 14 commits into
base: recovering
Choose a base branch
from
31 changes: 17 additions & 14 deletions crates/matrix-sdk-ui/src/room_list_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod state;
use std::{sync::Arc, time::Duration};

use async_stream::stream;
use eyeball::{SharedObservable, Subscriber};
use eyeball::Subscriber;
use futures_util::{pin_mut, Stream, StreamExt};
use matrix_sdk::{
event_cache::EventCacheError, Client, Error as SlidingSyncError, SlidingSync, SlidingSyncList,
Expand Down Expand Up @@ -89,7 +89,7 @@ pub struct RoomListService {
/// The current state of the `RoomListService`.
///
/// `RoomListService` is a simple state-machine.
state: SharedObservable<State>,
state_machine: StateMachine,
}

impl RoomListService {
Expand Down Expand Up @@ -172,7 +172,7 @@ impl RoomListService {
// Eagerly subscribe the event cache to sync responses.
client.event_cache().subscribe()?;

Ok(Self { client, sliding_sync, state: SharedObservable::new(State::Init) })
Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
}

/// Start to sync the room list.
Expand Down Expand Up @@ -208,7 +208,7 @@ impl RoomListService {
debug!("Run a sync iteration");

// Calculate the next state, and run the associated actions.
let next_state = self.state.get().next(&self.sliding_sync).await?;
let next_state = self.state_machine.next(&self.sliding_sync).await?;

// Do the sync.
match sync.next().await {
Expand All @@ -217,7 +217,7 @@ impl RoomListService {
debug!(state = ?next_state, "New state");

// Update the state.
self.state.set(next_state);
self.state_machine.set(next_state);

yield Ok(());
}
Expand All @@ -227,7 +227,7 @@ impl RoomListService {
debug!(expected_state = ?next_state, "New state is an error");

let next_state = State::Error { from: Box::new(next_state) };
self.state.set(next_state);
self.state_machine.set(next_state);

yield Err(Error::SlidingSync(error));

Expand All @@ -239,7 +239,7 @@ impl RoomListService {
debug!(expected_state = ?next_state, "New state is a termination");

let next_state = State::Terminated { from: Box::new(next_state) };
self.state.set(next_state);
self.state_machine.set(next_state);

break;
}
Expand Down Expand Up @@ -286,8 +286,8 @@ impl RoomListService {
// when the session is forced to expire, the state remains `Terminated`, thus
// the actions aren't executed as expected. Consequently, let's update the
// state.
if let State::Terminated { from } = self.state.get() {
self.state.set(State::Error { from });
if let State::Terminated { from } = self.state_machine.get() {
self.state_machine.set(State::Error { from });
}
}

Expand Down Expand Up @@ -341,7 +341,7 @@ impl RoomListService {
// Update the `current_state`.
current_state = next_state;
} else {
// Something is broken with `self.state`. Let's stop this stream too.
// Something is broken with the state. Let's stop this stream too.
break;
}
}
Expand All @@ -355,7 +355,7 @@ impl RoomListService {

/// Get a subscriber to the state.
pub fn state(&self) -> Subscriber<State> {
self.state.subscribe()
self.state_machine.subscribe()
}

async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
Expand Down Expand Up @@ -396,7 +396,7 @@ impl RoomListService {
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}

let cancel_in_flight_request = match self.state.get() {
let cancel_in_flight_request = match self.state_machine.get() {
State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
false
}
Expand Down Expand Up @@ -617,13 +617,16 @@ mod tests {
let _ = sync.next().await;

// State is `Terminated`, as expected!
assert_eq!(room_list.state.get(), State::Terminated { from: Box::new(State::Running) });
assert_eq!(
room_list.state_machine.get(),
State::Terminated { from: Box::new(State::Running) }
);

// Now, let's make the sliding sync session to expire.
room_list.expire_sync_session().await;

// State is `Error`, as a regular session expiration would generate!
assert_eq!(room_list.state.get(), State::Error { from: Box::new(State::Running) });
assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });

Ok(())
}
Expand Down
Loading