Skip to content

Commit

Permalink
send queue: wake up the sending task after editing an event
Browse files Browse the repository at this point in the history
It could be that the last event in a room's send queue has been marked
as wedged. In that case, the task will sleep until it's notified again.
If the event is being edited, then nothing would wake up the task; a
manual wakeup might be required in that case.

The new integration test shows the issue; the last `assert_update` would
fail with a timeout before this patch.
  • Loading branch information
bnjbvr committed Jul 1, 2024
1 parent a34e196 commit 00f3bb5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
3 changes: 3 additions & 0 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ impl SendHandle {
if self.room.inner.queue.replace(&self.transaction_id, serializable.clone()).await? {
trace!("successful edit");

// Wake up the queue, in case the room was asleep before the edit.
self.room.inner.notifier.notify_one();

// Propagate a replaced update too.
let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: self.transaction_id.clone(),
Expand Down
67 changes: 67 additions & 0 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,73 @@ async fn test_edit() {
assert!(watch.is_empty());
}

#[async_test]
async fn test_edit_wakes_the_sending_task() {
let (client, server) = logged_in_client_with_server().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");

let room = mock_sync_with_new_room(
|builder| {
builder.add_joined_room(JoinedRoomBuilder::new(room_id));
},
&client,
&server,
room_id,
)
.await;

let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await.unwrap();

assert!(local_echoes.is_empty());
assert!(watch.is_empty());

mock_encryption_state(&server, false).await;

let send_mock_scope = Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(413).set_body_json(json!({
// From https://spec.matrix.org/v1.10/client-server-api/#standard-error-response
"errcode": "M_TOO_LARGE",
})))
.expect(1)
.mount_as_scoped(&server)
.await;

let handle =
q.send(RoomMessageEventContent::text_plain("welcome to my ted talk").into()).await.unwrap();

// Receiving an update for the local echo.
let (txn, _) = assert_update!(watch => local echo { body = "welcome to my ted talk" });
assert!(watch.is_empty());

// Let the background task start now.
tokio::task::yield_now().await;

assert_update!(watch => error { recoverable = false, txn = txn });
assert!(watch.is_empty());

// Now edit the event's content (imagine we make it "shorter").
drop(send_mock_scope);
mock_send_event(event_id!("$1")).mount(&server).await;

let edited = handle
.edit(RoomMessageEventContent::text_plain("here's the summary of my ted talk").into())
.await
.unwrap();
assert!(edited);

// Let the server process the message.
assert_update!(watch => edit { body = "here's the summary of my ted talk", txn = txn });
assert_update!(watch => sent { txn = txn, });

assert!(watch.is_empty());
}

#[async_test]
async fn test_abort_after_disable() {
let (client, server) = logged_in_client_with_server().await;
Expand Down

0 comments on commit 00f3bb5

Please sign in to comment.