Skip to content

Commit

Permalink
fix(event cache): don't touch the linked chunk if an operation wouldn…
Browse files Browse the repository at this point in the history
…'t cause meaningful changes

See comment on top of `deduplicated_all_new_events`.
  • Loading branch information
bnjbvr committed Dec 19, 2024
1 parent fe9354a commit bc8c4f5
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 62 deletions.
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl RoomPagination {
let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);

// First, insert events.
let (add_event_report, insert_new_gap_pos) = if let Some(gap_id) = prev_gap_id {
let (added_unique_events, insert_new_gap_pos) = if let Some(gap_id) = prev_gap_id {
// There is a prior gap, let's replace it by new events!
trace!("replaced gap with new events from backpagination");
room_events
Expand Down Expand Up @@ -213,7 +213,7 @@ impl RoomPagination {
// We only do this when at least one new, non-duplicated event, has been added
// to the chunk. Otherwise it means we've back-paginated all the
// known events.
if !add_event_report.deduplicated_all_new_events() {
if added_unique_events {
if let Some(new_gap) = new_gap {
if let Some(new_pos) = insert_new_gap_pos {
room_events
Expand Down
119 changes: 64 additions & 55 deletions crates/matrix-sdk/src/event_cache/room/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,18 @@ impl RoomEvents {
/// Push events after all events or gaps.
///
/// The last event in `events` is the most recent one.
pub fn push_events<I>(&mut self, events: I) -> AddEventReport
///
/// Returns true if the linked chunk was modified, false otherwise.
pub fn push_events<I>(&mut self, events: I) -> bool
where
I: IntoIterator<Item = Event>,
{
let (unique_events, duplicated_event_ids) =
self.filter_duplicated_events(events.into_iter());

let report = AddEventReport {
num_new_unique: unique_events.len(),
num_duplicated: duplicated_event_ids.len(),
};
if deduplicated_all_new_events(unique_events.len(), duplicated_event_ids.len()) {
return false;
}

// Remove the _old_ duplicated events!
//
Expand All @@ -116,7 +117,7 @@ impl RoomEvents {
// Push new `events`.
self.chunks.push_items_back(unique_events);

report
true
}

/// Push a gap after all events or gaps.
Expand All @@ -125,21 +126,18 @@ impl RoomEvents {
}

/// Insert events at a specified position.
pub fn insert_events_at<I>(
&mut self,
events: I,
mut position: Position,
) -> Result<AddEventReport, Error>
///
/// Returns true if the linked chunk was modified.
pub fn insert_events_at<I>(&mut self, events: I, mut position: Position) -> Result<bool, Error>
where
I: IntoIterator<Item = Event>,
{
let (unique_events, duplicated_event_ids) =
self.filter_duplicated_events(events.into_iter());

let report = AddEventReport {
num_new_unique: unique_events.len(),
num_duplicated: duplicated_event_ids.len(),
};
if deduplicated_all_new_events(unique_events.len(), duplicated_event_ids.len()) {
return Ok(false);
}

// Remove the _old_ duplicated events!
//
Expand All @@ -150,7 +148,7 @@ impl RoomEvents {

self.chunks.insert_items_at(unique_events, position)?;

Ok(report)
Ok(true)
}

/// Insert a gap at a specified position.
Expand All @@ -163,23 +161,25 @@ impl RoomEvents {
/// Because the `gap_identifier` can represent non-gap chunk, this method
/// returns a `Result`.
///
/// This method returns a reference to the (first if many) newly created
/// `Chunk` that contains the `items`.
/// This method returns:
/// - a boolean indicating if we updated the linked chunk,
/// - a reference to the (first if many) newly created `Chunk` that contains
/// the `items`.
pub fn replace_gap_at<I>(
&mut self,
events: I,
gap_identifier: ChunkIdentifier,
) -> Result<(AddEventReport, Option<Position>), Error>
) -> Result<(bool, Option<Position>), Error>
where
I: IntoIterator<Item = Event>,
{
let (unique_events, duplicated_event_ids) =
self.filter_duplicated_events(events.into_iter());

let report = AddEventReport {
num_new_unique: unique_events.len(),
num_duplicated: duplicated_event_ids.len(),
};
if deduplicated_all_new_events(unique_events.len(), duplicated_event_ids.len()) {
let pos = self.chunks.remove_gap_at(gap_identifier)?;
return Ok((false, pos));
}

// Remove the _old_ duplicated events!
//
Expand All @@ -196,7 +196,8 @@ impl RoomEvents {
// Replace the gap by new events.
Some(self.chunks.replace_gap_at(unique_events, gap_identifier)?.first_position())
};
Ok((report, next_pos))

Ok((true, next_pos))
}

/// Search for a chunk, and return its identifier.
Expand Down Expand Up @@ -308,6 +309,29 @@ impl RoomEvents {
}
}

/// Whenever we add new events to the linked chunk, did we *at least add one*,
/// and all the added events were already known (deduplicated)?
///
/// This is useful to know whether we need to store a previous-batch token (gap)
/// we received from a server-side request (sync or back-pagination), or if we
/// should *not* store it.
///
/// Since there can be empty back-paginations with a previous-batch token (that
/// is, they don't contain any events), we need to make sure that there is *at
/// least* one new event that has been added. Otherwise, we might conclude
/// something wrong because a subsequent back-pagination might
/// return non-duplicated events.
///
/// If we had already seen all the duplicated events that we're trying to add,
/// then it would be wasteful to store a previous-batch token, or even touch the
/// linked chunk: we would repeat back-paginations for events that we have
/// already seen, and possibly misplace them. And we should not be missing
/// events either: the already-known events would have their own previous-batch
/// token (it might already be consumed).
fn deduplicated_all_new_events(num_new_unique: usize, num_duplicated: usize) -> bool {
num_new_unique > 0 && num_new_unique == num_duplicated
}

// Private implementations, implementation specific.
impl RoomEvents {
/// Remove some events from `Self::chunks`.
Expand Down Expand Up @@ -422,20 +446,6 @@ impl RoomEvents {
}
}

pub(in crate::event_cache) struct AddEventReport {
/// Number of new unique events that have been added.
num_new_unique: usize,
/// Number of events which have been deduplicated.
num_duplicated: usize,
}

impl AddEventReport {
/// Were all the events (at least one) we added already known?
pub fn deduplicated_all_new_events(&self) -> bool {
self.num_new_unique > 0 && self.num_new_unique == self.num_duplicated
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down Expand Up @@ -505,28 +515,30 @@ mod tests {
fn test_push_events_with_duplicates() {
let (event_id_0, event_0) = new_event("$ev0");
let (event_id_1, event_1) = new_event("$ev1");
let (event_id_2, event_2) = new_event("$ev1");

let mut room_events = RoomEvents::new();

room_events.push_events([event_0.clone(), event_1]);
room_events.push_events([event_2.clone()]);

assert_events_eq!(
room_events.events(),
[
(event_id_0 at (0, 0)),
(event_id_1 at (0, 1)),
(event_id_2 at (0, 0)),
]
);

// Everything is alright. Now let's push a duplicated event.
room_events.push_events([event_0]);
// Everything is alright. Now let's push a duplicated event by simulating a
// wider sync.
room_events.push_events([event_0, event_1, event_2]);

assert_events_eq!(
room_events.events(),
[
// The first `event_id_0` has been removed.
(event_id_1 at (0, 0)),
(event_id_0 at (0, 1)),
// The first `event_id_2` has been removed.
(event_id_0 at (0, 0)),
(event_id_1 at (0, 1)),
(event_id_2 at (0, 2)),
]
);
}
Expand All @@ -552,12 +564,11 @@ mod tests {
// Everything is alright. Now let's push a duplicated event.
room_events.push_events([event_0]);

// The event has been removed, then the chunk was empty, so removed, and a new
// chunk has been created with identifier 3.
// Nothing has changed in the linked chunk.
assert_events_eq!(
room_events.events(),
[
(event_id_0 at (3, 0)),
(event_id_0 at (2, 0)),
]
);
}
Expand Down Expand Up @@ -852,10 +863,9 @@ mod tests {
.unwrap();

// The next insert position is the next chunk's start.
let (report, pos) = room_events.replace_gap_at([], first_gap_id).unwrap();
let (touched_linked_chunk, pos) = room_events.replace_gap_at([], first_gap_id).unwrap();
assert_eq!(pos, Some(Position::new(ChunkIdentifier::new(2), 0)));
assert_eq!(report.num_new_unique, 0);
assert_eq!(report.num_duplicated, 0);
assert!(touched_linked_chunk);

// Remove the second gap.
let second_gap_id = room_events
Expand All @@ -864,10 +874,9 @@ mod tests {
.unwrap();

// No next insert position.
let (report, pos) = room_events.replace_gap_at([], second_gap_id).unwrap();
let (touched_linked_chunk, pos) = room_events.replace_gap_at([], second_gap_id).unwrap();
assert!(pos.is_none());
assert_eq!(report.num_new_unique, 0);
assert_eq!(report.num_duplicated, 0);
assert!(touched_linked_chunk);
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,9 @@ impl RoomEventCacheInner {
room_events.push_gap(Gap { prev_token: prev_token.clone() });
}

let add_event_report = room_events.push_events(sync_timeline_events.clone());
let added_unique_events = room_events.push_events(sync_timeline_events.clone());

if add_event_report.deduplicated_all_new_events() {
if !added_unique_events {
debug!(
"not storing previous batch token, because we deduplicated all new sync events"
);
Expand Down Expand Up @@ -1524,7 +1524,7 @@ mod tests {
assert_eq!(items[1].event_id().unwrap(), event_id2);

// A new update with one of these events leads to deduplication.
let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev1] };
let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
room_event_cache
.inner
.handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
Expand All @@ -1537,8 +1537,8 @@ mod tests {
// element anymore), and it's added to the back of the list.
let (items, _stream) = room_event_cache.subscribe().await.unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].event_id().unwrap(), event_id2);
assert_eq!(items[1].event_id().unwrap(), event_id1);
assert_eq!(items[0].event_id().unwrap(), event_id1);
assert_eq!(items[1].event_id().unwrap(), event_id2);
}

#[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
Expand Down

0 comments on commit bc8c4f5

Please sign in to comment.