From 5b8c751a63a05c38aa0e0342b0876a07fb3be293 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 12 Dec 2024 14:45:20 +0100 Subject: [PATCH] fix(event cache store): start RemoveChunk transactions in write mode This is unfortunate, but we're hitting a bad case of a read transaction upgrading to a write transaction, and that can plain fail if there's any other write transaction already happening in the background. To work around this, we need to start the transaction in write mode, so start with the update statements themselves. This means some work is duplicated (fetching a linked chunk), but this should be fine since there's an index on the pair (chunk id, room id). --- .../src/event_cache_store.rs | 95 +++++++++++++++---- 1 file changed, 76 insertions(+), 19 deletions(-) diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 33baf630b41..648a6255249 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -441,22 +441,44 @@ impl EventCacheStore for SqliteEventCacheStore { trace!(%room_id, "removing chunk @ {chunk_id}"); - // Find chunk to delete. - let (previous, next): (Option, Option) = txn.query_row( - "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?", - (chunk_id, &hashed_room_id), - |row| Ok((row.get(0)?, row.get(1)?)) - )?; + // Note: previously, we used a single select query to retrieve the + // previous/next values before doing two UPDATE. Unfortunately, this + // means that a read-only transaction may be upgraded to a write + // transaction, which fails if there's another write transaction + // happening elsewhere. + // + // To work-around this, we include the select statement in the update + // statement, so the transaction immediately starts in write mode. // Replace its previous' next to its own next. - if let Some(previous) = previous { - txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?; - } + // + // The inner SELECT retrieves the current chunk's next, and it's stored + // as cur.next. + + txn.execute(r#" + UPDATE linked_chunks AS lc + SET next = cur.next + FROM + (SELECT id, next + FROM linked_chunks + WHERE id = ? AND room_id = ? + ) + AS cur + WHERE lc.next = cur.id AND lc.room_id = ? + "#, (chunk_id, &hashed_room_id, &hashed_room_id))?; // Replace its next' previous to its own previous. - if let Some(next) = next { - txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?; - } + txn.execute(r#" + UPDATE linked_chunks AS lc + SET previous = cur.previous + FROM + (SELECT id, previous + FROM linked_chunks + WHERE id = ? AND room_id = ? + ) + AS cur + WHERE lc.previous = cur.id AND lc.room_id = ? + "#, (chunk_id, &hashed_room_id, &hashed_room_id))?; // Now delete it, and let cascading delete corresponding entries in the // other data tables. @@ -779,7 +801,7 @@ mod tests { use tempfile::{tempdir, TempDir}; use super::SqliteEventCacheStore; - use crate::utils::SqliteAsyncConnExt; + use crate::{event_cache_store::keys, utils::SqliteAsyncConnExt}; static TMP_DIR: Lazy = Lazy::new(|| tempdir().unwrap()); static NUM: AtomicU32 = AtomicU32::new(0); @@ -971,11 +993,12 @@ mod tests { async fn test_linked_chunk_remove_chunk() { let store = get_event_cache_store().await.expect("creating cache store failed"); - let room_id = &DEFAULT_TEST_ROOM_ID; + let room_id = &*DEFAULT_TEST_ROOM_ID; + let room_id2 = room_id!("!a:b.c"); store .handle_linked_chunk_updates( - room_id, + &room_id, vec![ Update::NewGapChunk { previous: None, @@ -1001,7 +1024,36 @@ mod tests { .await .unwrap(); - let mut chunks = store.reload_linked_chunk(room_id).await.unwrap(); + // Add some updates to another room using the same IDs, to make sure the query + // only affects a single room. + store + .handle_linked_chunk_updates( + room_id2, + vec![ + Update::NewGapChunk { + previous: None, + new: ChunkIdentifier::new(42), + next: None, + gap: Gap { prev_token: "raclette".to_owned() }, + }, + Update::NewGapChunk { + previous: Some(ChunkIdentifier::new(42)), + new: ChunkIdentifier::new(43), + next: None, + gap: Gap { prev_token: "fondue".to_owned() }, + }, + Update::NewGapChunk { + previous: Some(ChunkIdentifier::new(43)), + new: ChunkIdentifier::new(44), + next: None, + gap: Gap { prev_token: "tartiflette".to_owned() }, + }, + ], + ) + .await + .unwrap(); + + let mut chunks = store.reload_linked_chunk(&room_id).await.unwrap(); assert_eq!(chunks.len(), 2); @@ -1023,15 +1075,17 @@ mod tests { }); // Check that cascading worked. Yes, sqlite, I doubt you. + let store2 = store.clone(); let gaps = store .acquire() .await .unwrap() - .with_transaction(|txn| -> rusqlite::Result<_> { + .with_transaction(move |txn| -> rusqlite::Result<_> { let mut gaps = Vec::new(); + let hashed_room_id = store2.encode_key(keys::LINKED_CHUNKS, *room_id); for data in txn - .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")? - .query_map((), |row| row.get::<_, u64>(0))? + .prepare("SELECT chunk_id FROM gaps WHERE room_id = ? ORDER BY chunk_id")? + .query_map((&hashed_room_id,), |row| row.get::<_, u64>(0))? { gaps.push(data?); } @@ -1041,6 +1095,9 @@ mod tests { .unwrap(); assert_eq!(gaps, vec![42, 44]); + + // Sanity-check: chunks from the second room haven't been touched. + assert_eq!(store.reload_linked_chunk(room_id2).await.unwrap().len(), 3); } #[async_test]