Skip to content

Commit

Permalink
remove sync response cache
Browse files Browse the repository at this point in the history
This cache can serve invalid responses, and has an extremely low hit
rate.

It serves invalid responses because because it's only keyed off
the `since` parameter, but many of the other request parameters also
affect the response or it's side effects. This will become worse once we
implement filtering, because there will be a wider space of parameters
with different responses. This problem is fixable, but not worth it
because of the low hit rate.

The low hit rate is because normal clients will always issue the next
sync request with `since` set to the `prev_batch` value of the previous
response. The only time we expect to see multiple requests with the same
`since` is when the response is empty, but we don't cache empty
responses.

This was confirmed experimentally by logging cache hits and misses over
15 minutes with a wide variety of clients. This test was run on
matrix.computer.surgery, which has only a few active users, but a
large volume of sync traffic from many rooms. Over the test period, we
had 3 hits and 5309 misses. All hits occurred in the first minute, so I
suspect that they had something to do with client recovery from an
offline state. The clients that were connected during the test are:

 - element web
 - schildichat web
 - iamb
 - gomuks
 - nheko
 - fractal
 - fluffychat web
 - fluffychat android
 - cinny web
 - element android
 - element X android

Fixes: girlbossceo#336
  • Loading branch information
olivia-fl committed May 17, 2024
1 parent 99d98ef commit 4ab65a4
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 109 deletions.
103 changes: 4 additions & 99 deletions src/api/client_server/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use ruma::{
StateEventType, TimelineEventType,
},
serde::Raw,
uint, DeviceId, EventId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
uint, DeviceId, EventId, OwnedUserId, RoomId, UInt, UserId,
};
use tokio::sync::watch::Sender;
use tracing::{debug, error, Instrument as _, Span};
use tracing::{error, Instrument as _, Span};

use crate::{
service::{pdu::EventHash, rooms::timeline::PduCount},
Expand Down Expand Up @@ -73,106 +72,13 @@ use crate::{
/// For left rooms:
/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
/// subset of the state at the point of the leave)
///
/// - Sync is handled in an async task, multiple requests from the same device
/// with the same
/// `since` will be cached
pub(crate) async fn sync_events_route(
body: Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let body = body.body;

let mut rx = match services()
.globals
.sync_receivers
.write()
.await
.entry((sender_user.clone(), sender_device.clone()))
{
Entry::Vacant(v) => {
let (tx, rx) = tokio::sync::watch::channel(None);

v.insert((body.since.clone(), rx.clone()));

tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));

rx
},
Entry::Occupied(mut o) => {
if o.get().0 != body.since {
let (tx, rx) = tokio::sync::watch::channel(None);

o.insert((body.since.clone(), rx.clone()));

debug!("Sync started for {sender_user}");

tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx));

rx
} else {
o.get().1.clone()
}
},
};

let we_have_to_wait = rx.borrow().is_none();
if we_have_to_wait {
if let Err(e) = rx.changed().await {
error!("Error waiting for sync: {}", e);
}
}

let result = match rx
.borrow()
.as_ref()
.expect("When sync channel changes it's always set to some")
{
Ok(response) => Ok(response.clone()),
Err(error) => Err(error.to_response()),
};

result
}

async fn sync_helper_wrapper(
sender_user: OwnedUserId, sender_device: OwnedDeviceId, body: sync_events::v3::Request,
tx: Sender<Option<Result<sync_events::v3::Response>>>,
) {
let since = body.since.clone();

let r = sync_helper(sender_user.clone(), sender_device.clone(), body).await;

if let Ok((_, caching_allowed)) = r {
if !caching_allowed {
match services()
.globals
.sync_receivers
.write()
.await
.entry((sender_user, sender_device))
{
Entry::Occupied(o) => {
// Only remove if the device didn't start a different /sync already
if o.get().0 == since {
o.remove();
}
},
Entry::Vacant(_) => {},
}
}
}

_ = tx.send(Some(r.map(|(r, _)| r)));
}

async fn sync_helper(
sender_user: OwnedUserId,
sender_device: OwnedDeviceId,
body: sync_events::v3::Request,
// bool = caching allowed
) -> Result<(sync_events::v3::Response, bool), Error> {
// Presence update
if services().globals.allow_local_presence() {
services()
Expand Down Expand Up @@ -414,10 +320,9 @@ async fn sync_helper(
duration = Duration::from_secs(30);
}
_ = tokio::time::timeout(duration, watcher).await;
Ok((response, false))
} else {
Ok((response, since != next_batch)) // Only cache if we made progress
}

Ok(response)
}

#[tracing::instrument(skip_all, fields(user_id = %sender_user, room_id = %room_id))]
Expand Down
14 changes: 4 additions & 10 deletions src/service/globals/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use ipaddress::IPAddress;
use regex::RegexSet;
use ruma::{
api::{
client::{discovery::discover_support::ContactRole, sync::sync_events},
client::discovery::discover_support::ContactRole,
federation::discovery::{ServerSigningKeys, VerifyKey},
},
serde::Base64,
DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId,
RoomVersionId, ServerName, UserId,
DeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomVersionId,
ServerName, UserId,
};
use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock};
use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::{error, info, trace};
use url::Url;

Expand All @@ -36,10 +36,6 @@ mod data;
mod resolver;

type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
type SyncHandle = (
Option<String>, // since
Receiver<Option<Result<sync_events::v3::Response>>>, // rx
);

pub(crate) struct Service<'a> {
pub(crate) db: &'static dyn Data,
Expand All @@ -56,7 +52,6 @@ pub(crate) struct Service<'a> {
pub(crate) bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
pub(crate) bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub(crate) bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub(crate) sync_receivers: RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>,
pub(crate) roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
pub(crate) roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
pub(crate) roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, // this lock will be held longer
Expand Down Expand Up @@ -163,7 +158,6 @@ impl Service<'_> {
roomid_mutex_federation: RwLock::new(HashMap::new()),
roomid_federationhandletime: RwLock::new(HashMap::new()),
stateres_mutex: Arc::new(Mutex::new(())),
sync_receivers: RwLock::new(HashMap::new()),
rotate: RotationHandler::new(),
started: SystemTime::now(),
shutdown: AtomicBool::new(false),
Expand Down

0 comments on commit 4ab65a4

Please sign in to comment.