Skip to content

Commit

Permalink
send queue: wake up the sending task after editing an event
Browse files Browse the repository at this point in the history
It could be that the last event in a room's send queue has been marked
as wedged. In that case, the task will sleep until it's notified again.
If the event is being edited, then nothing would wake up the task; a
manual wakeup might be required in that case.

The new integration test shows the issue; the last `assert_update` would
fail with a timeout before this patch.
  • Loading branch information
bnjbvr committed Jul 1, 2024
1 parent a34e196 commit 08ec617
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
3 changes: 3 additions & 0 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ impl SendHandle {
if self.room.inner.queue.replace(&self.transaction_id, serializable.clone()).await? {
trace!("successful edit");

// Wake up the queue, in case the room was asleep before the edit.
self.room.inner.notifier.notify_one();

// Propagate a replaced update too.
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: self.transaction_id.clone(),
Expand Down
67 changes: 67 additions & 0 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,73 @@ async fn test_edit() {
assert!(watch.is_empty());
}

#[async_test]
async fn test_edit_wakes_the_sending_task() {
let (client, server) = logged_in_client_with_server().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");

let room = mock_sync_with_new_room(
|builder| {
builder.add_joined_room(JoinedRoomBuilder::new(room_id));
},
&client,
&server,
room_id,
)
.await;

let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await.unwrap();

assert!(local_echoes.is_empty());
assert!(watch.is_empty());

mock_encryption_state(&server, false).await;

let send_mock_scope = Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(413).set_body_json(json!({
// From https://spec.matrix.org/v1.10/client-server-api/#standard-error-response
"errcode": "M_TOO_LARGE",
})))
.expect(1)
.mount_as_scoped(&server)
.await;

let handle =
q.send(RoomMessageEventContent::text_plain("welcome to my ted talk").into()).await.unwrap();

// Receiving updates for local echoes.
let (txn, _) = assert_update!(watch => local echo { body = "welcome to my ted talk" });
assert!(watch.is_empty());

// Let the background task start now.
tokio::task::yield_now().await;

assert_update!(watch => error { recoverable = false, txn = txn });
assert!(watch.is_empty());

// Now edit the task (imagine we make it "shorter").
drop(send_mock_scope);
mock_send_event(event_id!("$1")).mount(&server).await;

let edited = handle
.edit(RoomMessageEventContent::text_plain("here's the summary of my ted talk").into())
.await
.unwrap();
assert!(edited);

// Now the server will process the messages in order.
assert_update!(watch => edit { body = "here's the summary of my ted talk", txn = txn });
assert_update!(watch => sent { txn = txn, });

assert!(watch.is_empty());
}

#[async_test]
async fn test_abort_after_disable() {
let (client, server) = logged_in_client_with_server().await;
Expand Down

0 comments on commit 08ec617

Please sign in to comment.