Skip to content

Commit

Permalink
Merge branch 'main' into reload-list-from-cache2
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Bouvier <[email protected]>
  • Loading branch information
bnjbvr committed May 15, 2023
2 parents 5c8ad02 + c404e37 commit ea6c962
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ publish = false

[dependencies]
criterion = { version = "0.4.0", features = ["async", "async_tokio", "html_reports"] }
matrix-sdk-base = { path = "../crates/matrix-sdk-base" }
matrix-sdk-crypto = { path = "../crates/matrix-sdk-crypto", version = "0.6.0"}
matrix-sdk-sqlite = { path = "../crates/matrix-sdk-sqlite", version = "0.1.0", default-features = false, features = ["crypto-store"] }
matrix-sdk-sled = { path = "../crates/matrix-sdk-sled", version = "0.2.0", default-features = false, features = ["crypto-store"] }
matrix-sdk-sled = { path = "../crates/matrix-sdk-sled", version = "0.2.0", features = ["crypto-store"] }
matrix-sdk-test = { path = "../testing/matrix-sdk-test", version = "0.6.0"}
matrix-sdk = { path = "../crates/matrix-sdk" }
ruma = { workspace = true }
serde_json = { workspace = true }
tempfile = "3.3.0"
Expand All @@ -24,3 +26,7 @@ pprof = { version = "0.11.0", features = ["flamegraph", "criterion"] }
[[bench]]
name = "crypto_bench"
harness = false

[[bench]]
name = "store_bench"
harness = false
149 changes: 149 additions & 0 deletions benchmarks/benches/store_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use criterion::*;
use matrix_sdk::{config::StoreConfig, Client, RoomInfo, RoomState, Session, StateChanges};
use matrix_sdk_base::{store::MemoryStore, StateStore as _};
use matrix_sdk_sled::SledStateStore;
use matrix_sdk_sqlite::SqliteStateStore;
use ruma::{device_id, user_id, RoomId};
use tokio::runtime::Builder;

fn criterion() -> Criterion {
#[cfg(target_os = "linux")]
let criterion = Criterion::default().with_profiler(pprof::criterion::PProfProfiler::new(
100,
pprof::criterion::Output::Flamegraph(None),
));

#[cfg(not(target_os = "linux"))]
let criterion = Criterion::default();

criterion
}

/// Number of joined rooms in the benchmark.
const NUM_JOINED_ROOMS: usize = 10000;

/// Number of stripped rooms in the benchmark.
const NUM_STRIPPED_JOINED_ROOMS: usize = 10000;

pub fn restore_session(c: &mut Criterion) {
let runtime = Builder::new_multi_thread().build().expect("Can't create runtime");

// Create a fake list of changes, and a session to recover from.
let mut changes = StateChanges::default();

for i in 0..NUM_JOINED_ROOMS {
let room_id = RoomId::parse(format!("!room{i}:example.com")).unwrap().to_owned();
changes.add_room(RoomInfo::new(&room_id, RoomState::Joined));
}

for i in 0..NUM_STRIPPED_JOINED_ROOMS {
let room_id = RoomId::parse(format!("!strippedroom{i}:example.com")).unwrap().to_owned();
changes.add_stripped_room(RoomInfo::new(&room_id, RoomState::Joined));
}

let session = Session {
access_token: "OHEY".to_owned(),
refresh_token: None,
user_id: user_id!("@somebody:example.com").to_owned(),
device_id: device_id!("DEVICE_ID").to_owned(),
};

// Start the benchmark.

let mut group = c.benchmark_group("Client reload");
group.throughput(Throughput::Elements(100));

const NAME: &str = "restore a session";

// Memory
let mem_store = MemoryStore::new();
runtime.block_on(mem_store.save_changes(&changes)).expect("initial filling of mem failed");

group.bench_with_input(BenchmarkId::new("memory store", NAME), &mem_store, |b, store| {
b.to_async(&runtime).iter(|| async {
let client = Client::builder()
.homeserver_url("https://matrix.example.com")
.store_config(StoreConfig::new().state_store(store.clone()))
.build()
.await
.expect("Can't build client");
client.restore_session(session.clone()).await.expect("couldn't restore session");
})
});

for encryption_password in [None, Some("hunter2")] {
let encrypted_suffix = if encryption_password.is_some() { "encrypted" } else { "clear" };

// Sled
let sled_path = tempfile::tempdir().unwrap().path().to_path_buf();
let mut sled_store_builder = SledStateStore::builder().path(sled_path);
if let Some(password) = encryption_password {
sled_store_builder = sled_store_builder.passphrase(password.to_owned());
}
let sled_store = sled_store_builder.build().expect("Can't create sled store");
runtime
.block_on(sled_store.save_changes(&changes))
.expect("initial filling of sled failed");

group.bench_with_input(
BenchmarkId::new(format!("sled store {encrypted_suffix}"), NAME),
&sled_store,
|b, store| {
b.to_async(&runtime).iter(|| async {
let client = Client::builder()
.homeserver_url("https://matrix.example.com")
.store_config(StoreConfig::new().state_store(store.clone()))
.build()
.await
.expect("Can't build client");
client
.restore_session(session.clone())
.await
.expect("couldn't restore session");
})
},
);

// Sqlite
let sqlite_dir = tempfile::tempdir().unwrap();
let sqlite_store = runtime
.block_on(SqliteStateStore::open(sqlite_dir.path(), encryption_password))
.unwrap();
runtime
.block_on(sqlite_store.save_changes(&changes))
.expect("initial filling of sqlite failed");

group.bench_with_input(
BenchmarkId::new(format!("sqlite store {encrypted_suffix}"), NAME),
&sqlite_store,
|b, store| {
b.to_async(&runtime).iter(|| async {
let client = Client::builder()
.homeserver_url("https://matrix.example.com")
.store_config(StoreConfig::new().state_store(store.clone()))
.build()
.await
.expect("Can't build client");
client
.restore_session(session.clone())
.await
.expect("couldn't restore session");
})
},
);

{
let _guard = runtime.enter();
drop(sqlite_store);
}
}

group.finish()
}

criterion_group! {
name = benches;
config = criterion();
targets = restore_session
}
criterion_main!(benches);
1 change: 1 addition & 0 deletions bindings/matrix-sdk-crypto-js/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- Extend `OlmDevice.markRequestAsSent` to accept responses to
`SigningKeysUploadRequest`s.
- Fix the body of `SignatureUploadRequest`s to match the spec.

# v0.1.0-alpha.8

Expand Down
12 changes: 6 additions & 6 deletions bindings/matrix-sdk-crypto-js/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,19 @@ pub struct SignatureUploadRequest {
#[wasm_bindgen(readonly)]
pub id: Option<JsString>,

/// A JSON-encoded string containing the rest of the payload: `signed_keys`.
/// A JSON-encoded string containing the payload of the request
///
/// It represents the body of the HTTP request.
#[wasm_bindgen(readonly)]
pub body: JsString,
#[wasm_bindgen(readonly, js_name = "body")]
pub signed_keys: JsString,
}

#[wasm_bindgen]
impl SignatureUploadRequest {
/// Create a new `SignatureUploadRequest`.
#[wasm_bindgen(constructor)]
pub fn new(id: JsString, body: JsString) -> SignatureUploadRequest {
Self { id: Some(id), body }
pub fn new(id: JsString, signed_keys: JsString) -> SignatureUploadRequest {
Self { id: Some(id), signed_keys }
}

/// Get its request type.
Expand Down Expand Up @@ -417,7 +417,7 @@ request!(KeysUploadRequest from OriginalKeysUploadRequest groups device_keys, on
request!(KeysQueryRequest from OriginalKeysQueryRequest groups timeout, device_keys, token);
request!(KeysClaimRequest from OriginalKeysClaimRequest groups timeout, one_time_keys);
request!(ToDeviceRequest from OriginalToDeviceRequest extracts event_type: string, txn_id: string and groups messages);
request!(SignatureUploadRequest from OriginalSignatureUploadRequest groups signed_keys);
request!(SignatureUploadRequest from OriginalSignatureUploadRequest extracts signed_keys: json);
request!(RoomMessageRequest from OriginalRoomMessageRequest extracts room_id: string, txn_id: string, event_type: event_type, content: json);
request!(KeysBackupRequest from OriginalKeysBackupRequest groups rooms);

Expand Down
18 changes: 14 additions & 4 deletions bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,22 @@ impl Room {
})
}

pub fn member(&self, user_id: String) -> Result<Arc<RoomMember>, ClientError> {
let room = self.room.clone();
let user_id = user_id;
RUNTIME.block_on(async move {
let user_id = UserId::parse(&*user_id).context("Invalid user id.")?;
let member = room.get_member(&user_id).await?.context("No user found")?;
Ok(Arc::new(RoomMember::new(member)))
})
}

pub fn member_avatar_url(&self, user_id: String) -> Result<Option<String>, ClientError> {
let room = self.room.clone();
let user_id = user_id;
RUNTIME.block_on(async move {
let user_id = <&UserId>::try_from(&*user_id).context("Invalid user id.")?;
let member = room.get_member(user_id).await?.context("No user found")?;
let user_id = UserId::parse(&*user_id).context("Invalid user id.")?;
let member = room.get_member(&user_id).await?.context("No user found")?;
let avatar_url_string = member.avatar_url().map(|m| m.to_string());
Ok(avatar_url_string)
})
Expand All @@ -202,8 +212,8 @@ impl Room {
let room = self.room.clone();
let user_id = user_id;
RUNTIME.block_on(async move {
let user_id = <&UserId>::try_from(&*user_id).context("Invalid user id.")?;
let member = room.get_member(user_id).await?.context("No user found")?;
let user_id = UserId::parse(&*user_id).context("Invalid user id.")?;
let member = room.get_member(&user_id).await?.context("No user found")?;
let avatar_url_string = member.display_name().map(|m| m.to_owned());
Ok(avatar_url_string)
})
Expand Down
20 changes: 17 additions & 3 deletions crates/matrix-sdk/src/room/common.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{borrow::Borrow, collections::BTreeMap, ops::Deref, sync::Arc};
use std::{borrow::Borrow, collections::BTreeMap, fmt, ops::Deref, sync::Arc};

use matrix_sdk_base::{
deserialized_responses::{MembersResponse, TimelineEvent},
store::StateStoreExt,
RoomMemberships, StateChanges,
};
use matrix_sdk_common::debug::DebugStructExt;
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
Expand Down Expand Up @@ -46,7 +47,7 @@ use ruma::{
};
use serde::de::DeserializeOwned;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::{debug, instrument};

#[cfg(feature = "experimental-timeline")]
use super::timeline::Timeline;
Expand Down Expand Up @@ -201,6 +202,7 @@ impl Common {
/// assert!(room.messages(options).await.is_ok());
/// # });
/// ```
#[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
let room_id = self.inner.room_id();
let request = options.into_request(room_id);
Expand Down Expand Up @@ -1076,7 +1078,6 @@ impl Common {
/// See that method and
/// <https://spec.matrix.org/v1.3/client-server-api/#get_matrixclientv3roomsroomidmessages>
/// for details.
#[derive(Debug)]
#[non_exhaustive]
pub struct MessagesOptions {
/// The token to start returning events from.
Expand Down Expand Up @@ -1152,3 +1153,16 @@ impl MessagesOptions {
})
}
}

impl fmt::Debug for MessagesOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { from, to, dir, limit, filter } = self;

let mut s = f.debug_struct("MessagesOptions");
s.maybe_field("from", from).maybe_field("to", to).field("dir", dir).field("limit", limit);
if !filter.is_empty() {
s.field("filter", filter);
}
s.finish()
}
}
6 changes: 3 additions & 3 deletions crates/matrix-sdk/src/room/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ impl Timeline {
}

/// Add more events to the start of the timeline.
#[instrument(skip_all, fields(initial_pagination_size, room_id = ?self.room().room_id()))]
pub async fn paginate_backwards(&self, mut opts: PaginationOptions<'_>) -> Result<()> {
#[instrument(skip_all, fields(room_id = ?self.room().room_id(), ?options))]
pub async fn paginate_backwards(&self, mut options: PaginationOptions<'_>) -> Result<()> {
let mut start_lock = self.start_token.lock().await;
if start_lock.is_none()
&& self.inner.items().await.front().map_or(false, |item| item.is_timeline_start())
Expand All @@ -124,7 +124,7 @@ impl Timeline {
let mut from = start_lock.clone();
let mut outcome = PaginationOutcome::new();

while let Some(limit) = opts.next_event_limit(outcome) {
while let Some(limit) = options.next_event_limit(outcome) {
let messages = self
.room()
.messages(assign!(MessagesOptions::backward(), {
Expand Down
13 changes: 13 additions & 0 deletions crates/matrix-sdk/src/sliding_sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ mod tests {
assert_eq!(initial_max_number_of_rooms, Some(42));
}

// The maximum number of rooms reloaded from the cache should have been
// published.
{
let mut stream = max_number_of_room_stream
.write()
.unwrap()
.take()
.expect("stream must be set");
let initial_max_number_of_rooms =
stream.next().await.expect("stream must have emitted something");
assert_eq!(initial_max_number_of_rooms, Some(42));
}

// Clean the cache.
clean_storage(&client, "hello", &sliding_sync.inner.lists.read().unwrap()).await;
}
Expand Down
14 changes: 10 additions & 4 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ impl SlidingSync {

// Spawn a new future to ensure that the code inside this future cannot be
// cancelled if this method is cancelled.
spawn(async move {
let fut = async move {
debug!("Sliding Sync response handling starts");

// In case the task running this future is detached, we must
Expand All @@ -529,14 +529,19 @@ impl SlidingSync {

match &response.txn_id {
None => {
error!(stream_id, "Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; it's missing");
error!(
stream_id,
"Sliding Sync has received an unexpected response: \
`txn_id` must match `stream_id`; it's missing"
);
}

Some(txn_id) if txn_id != &stream_id => {
error!(
stream_id,
txn_id,
"Sliding Sync has received an unexpected response: `txn_id` must match `stream_id`; they differ"
"Sliding Sync has received an unexpected response: \
`txn_id` must match `stream_id`; they differ"
);
}

Expand All @@ -563,7 +568,8 @@ impl SlidingSync {
debug!("Sliding Sync response has been fully handled");

Ok(Some(updates))
}).await.unwrap()
};
spawn(fut.instrument(Span::current())).await.unwrap()
}

/// Create a _new_ Sliding Sync stream.
Expand Down
Loading

0 comments on commit ea6c962

Please sign in to comment.