Skip to content

Commit

Permalink
Add created_at time persisted send_queue system
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Salinas authored and Daniel Salinas committed Dec 18, 2024
1 parent bb57311 commit d47e45a
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 66 deletions.
2 changes: 2 additions & 0 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,7 @@ pub struct EventTimelineItem {
timestamp: u64,
reactions: Vec<Reaction>,
local_send_state: Option<EventSendState>,
local_created_at: Option<u64>,
read_receipts: HashMap<String, Receipt>,
origin: Option<EventItemOrigin>,
can_be_replied_to: bool,
Expand Down Expand Up @@ -1121,6 +1122,7 @@ impl From<matrix_sdk_ui::timeline::EventTimelineItem> for EventTimelineItem {
timestamp: item.timestamp().0.into(),
reactions,
local_send_state: item.send_state().map(|s| s.into()),
local_created_at: item.local_created_at().map(|t| t.0.into()),
read_receipts,
origin: item.origin(),
can_be_replied_to: item.can_be_replied_to(),
Expand Down
107 changes: 96 additions & 11 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use ruma::{
},
owned_event_id, owned_mxc_uri, room_id,
serde::Raw,
uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId,
TransactionId, UserId,
};
use serde_json::{json, value::Value as JsonValue};

Expand Down Expand Up @@ -980,13 +981,21 @@ impl StateStoreIntegrationTests for DynStateStore {
let ev =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())
.unwrap();
self.save_send_queue_request(room_id, txn.clone(), ev.into(), 0).await?;
self.save_send_queue_request(
room_id,
txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev.into(),
0,
)
.await?;

// Add a single dependent queue request.
self.save_dependent_queued_request(
room_id,
&txn,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await?;
Expand Down Expand Up @@ -1242,7 +1251,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn0.clone(),
MilliSecondsSinceUnixEpoch::now(),
event0.into(),
0,
)
.await
.unwrap();

// Reading it will work.
let pending = self.load_send_queue_requests(room_id).await.unwrap();
Expand All @@ -1266,7 +1283,15 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.unwrap();

self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn,
MilliSecondsSinceUnixEpoch::now(),
event.into(),
0,
)
.await
.unwrap();
}

// Reading all the events should work.
Expand Down Expand Up @@ -1364,7 +1389,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
.unwrap();
self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id2,
txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
event.into(),
0,
)
.await
.unwrap();
}

// Add and remove one event for room3.
Expand All @@ -1374,7 +1407,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
.unwrap();
self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id3,
txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
event.into(),
0,
)
.await
.unwrap();

self.remove_send_queue_request(room_id3, &txn).await.unwrap();
}
Expand All @@ -1399,21 +1440,45 @@ impl StateStoreIntegrationTests for DynStateStore {
let ev0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
.unwrap();
self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap();
self.save_send_queue_request(
room_id,
low0_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev0.into(),
2,
)
.await
.unwrap();

// Saving one request with higher priority should work.
let high_txn = TransactionId::new();
let ev1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
.unwrap();
self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap();
self.save_send_queue_request(
room_id,
high_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev1.into(),
10,
)
.await
.unwrap();

// Saving another request with the low priority should work.
let low1_txn = TransactionId::new();
let ev2 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
.unwrap();
self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap();
self.save_send_queue_request(
room_id,
low1_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
ev2.into(),
2,
)
.await
.unwrap();

// The requests should be ordered from higher priority to lower, and when equal,
// should use the insertion order instead.
Expand Down Expand Up @@ -1453,7 +1518,15 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn0.clone(),
MilliSecondsSinceUnixEpoch::now(),
event0.into(),
0,
)
.await
.unwrap();

// No dependents, to start with.
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
Expand All @@ -1464,6 +1537,7 @@ impl StateStoreIntegrationTests for DynStateStore {
room_id,
&txn0,
child_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await
Expand Down Expand Up @@ -1515,12 +1589,21 @@ impl StateStoreIntegrationTests for DynStateStore {
let event1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
.unwrap();
self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap();
self.save_send_queue_request(
room_id,
txn1.clone(),
MilliSecondsSinceUnixEpoch::now(),
event1.into(),
0,
)
.await
.unwrap();

self.save_dependent_queued_request(
room_id,
&txn0,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await
Expand All @@ -1531,6 +1614,7 @@ impl StateStoreIntegrationTests for DynStateStore {
room_id,
&txn1,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
Expand Down Expand Up @@ -1563,6 +1647,7 @@ impl StateStoreIntegrationTests for DynStateStore {
room_id,
&txn,
child_txn.clone(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await
Expand Down
23 changes: 14 additions & 9 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use ruma::{
},
serde::Raw,
time::Instant,
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
};
use tracing::{debug, instrument, warn};

Expand Down Expand Up @@ -750,16 +750,19 @@ impl StateStore for MemoryStore {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
kind: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.inner
.write()
.unwrap()
.send_queue_events
.entry(room_id.to_owned())
.or_default()
.push(QueuedRequest { kind, transaction_id, error: None, priority });
self.inner.write().unwrap().send_queue_events.entry(room_id.to_owned()).or_default().push(
QueuedRequest {
kind,
transaction_id,
error: None,
priority,
created_at: Some(created_at),
},
);
Ok(())
}

Expand Down Expand Up @@ -858,6 +861,7 @@ impl StateStore for MemoryStore {
room: &RoomId,
parent_transaction_id: &TransactionId,
own_transaction_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.inner
Expand All @@ -871,6 +875,7 @@ impl StateStore for MemoryStore {
parent_transaction_id: parent_transaction_id.to_owned(),
own_transaction_id,
parent_key: None,
created_at: Some(created_at),
});
Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion crates/matrix-sdk-base/src/store/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use ruma::{
AnyMessageLikeEventContent, EventContent as _, RawExt as _,
},
serde::Raw,
OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt,
MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
TransactionId, UInt,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -131,6 +132,9 @@ pub struct QueuedRequest {
/// The bigger the value, the higher the priority at which this request
/// should be handled.
pub priority: usize,

/// The time that the request was original attempted.
pub created_at: Option<MilliSecondsSinceUnixEpoch>,
}

impl QueuedRequest {
Expand Down Expand Up @@ -371,6 +375,10 @@ pub struct DependentQueuedRequest {
/// If the parent request has been sent, the parent's request identifier
/// returned by the server once the local echo has been sent out.
pub parent_key: Option<SentRequestKey>,

/// The time that the request was original attempted.
#[serde(skip_serializing_if = "Option::is_none")]
pub created_at: Option<MilliSecondsSinceUnixEpoch>,
}

impl DependentQueuedRequest {
Expand Down
12 changes: 8 additions & 4 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use ruma::{
},
serde::Raw,
time::SystemTime,
EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId,
TransactionId, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -359,6 +359,7 @@ pub trait StateStore: AsyncTraitDeps {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error>;
Expand Down Expand Up @@ -421,6 +422,7 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error>;

Expand Down Expand Up @@ -657,11 +659,12 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.0
.save_send_queue_request(room_id, transaction_id, content, priority)
.save_send_queue_request(room_id, transaction_id, created_at, content, priority)
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -711,10 +714,11 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.0
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content)
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
.await
.map_err(Into::into)
}
Expand Down
Loading

0 comments on commit d47e45a

Please sign in to comment.