diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 647ce905d..25dbe67d1 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -25,10 +25,9 @@ use ruma::{ StateEventType, TimelineEventType, }, serde::Raw, - uint, DeviceId, EventId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, + uint, DeviceId, EventId, OwnedUserId, RoomId, UInt, UserId, }; -use tokio::sync::watch::Sender; -use tracing::{debug, error, Instrument as _, Span}; +use tracing::{error, Instrument as _, Span}; use crate::{ service::{pdu::EventHash, rooms::timeline::PduCount}, @@ -73,10 +72,6 @@ use crate::{ /// For left rooms: /// - If the user left after `since`: `prev_batch` token, empty state (TODO: /// subset of the state at the point of the leave) -/// -/// - Sync is handled in an async task, multiple requests from the same device -/// with the same -/// `since` will be cached pub(crate) async fn sync_events_route( body: Ruma, ) -> Result> { @@ -84,95 +79,6 @@ pub(crate) async fn sync_events_route( let sender_device = body.sender_device.expect("user is authenticated"); let body = body.body; - let mut rx = match services() - .globals - .sync_receivers - .write() - .await - .entry((sender_user.clone(), sender_device.clone())) - { - Entry::Vacant(v) => { - let (tx, rx) = tokio::sync::watch::channel(None); - - v.insert((body.since.clone(), rx.clone())); - - tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx)); - - rx - }, - Entry::Occupied(mut o) => { - if o.get().0 != body.since { - let (tx, rx) = tokio::sync::watch::channel(None); - - o.insert((body.since.clone(), rx.clone())); - - debug!("Sync started for {sender_user}"); - - tokio::spawn(sync_helper_wrapper(sender_user.clone(), sender_device.clone(), body, tx)); - - rx - } else { - o.get().1.clone() - } - }, - }; - - let we_have_to_wait = rx.borrow().is_none(); - if we_have_to_wait { - if let Err(e) = rx.changed().await { - error!("Error waiting for sync: {}", e); - } - } - - let result = match rx - .borrow() - .as_ref() - .expect("When sync channel changes it's always set to some") - { - Ok(response) => Ok(response.clone()), - Err(error) => Err(error.to_response()), - }; - - result -} - -async fn sync_helper_wrapper( - sender_user: OwnedUserId, sender_device: OwnedDeviceId, body: sync_events::v3::Request, - tx: Sender>>, -) { - let since = body.since.clone(); - - let r = sync_helper(sender_user.clone(), sender_device.clone(), body).await; - - if let Ok((_, caching_allowed)) = r { - if !caching_allowed { - match services() - .globals - .sync_receivers - .write() - .await - .entry((sender_user, sender_device)) - { - Entry::Occupied(o) => { - // Only remove if the device didn't start a different /sync already - if o.get().0 == since { - o.remove(); - } - }, - Entry::Vacant(_) => {}, - } - } - } - - _ = tx.send(Some(r.map(|(r, _)| r))); -} - -async fn sync_helper( - sender_user: OwnedUserId, - sender_device: OwnedDeviceId, - body: sync_events::v3::Request, - // bool = caching allowed -) -> Result<(sync_events::v3::Response, bool), Error> { // Presence update if services().globals.allow_local_presence() { services() @@ -414,10 +320,9 @@ async fn sync_helper( duration = Duration::from_secs(30); } _ = tokio::time::timeout(duration, watcher).await; - Ok((response, false)) - } else { - Ok((response, since != next_batch)) // Only cache if we made progress } + + Ok(response) } #[tracing::instrument(skip_all, fields(user_id = %sender_user, room_id = %room_id))] diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 402934ef8..4484b8164 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -18,14 +18,14 @@ use ipaddress::IPAddress; use regex::RegexSet; use ruma::{ api::{ - client::{discovery::discover_support::ContactRole, sync::sync_events}, + client::discovery::discover_support::ContactRole, federation::discovery::{ServerSigningKeys, VerifyKey}, }, serde::Base64, - DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, - RoomVersionId, ServerName, UserId, + DeviceId, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomVersionId, + ServerName, UserId, }; -use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock}; +use tokio::sync::{broadcast, Mutex, RwLock}; use tracing::{error, info, trace}; use url::Url; @@ -36,10 +36,6 @@ mod data; mod resolver; type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries -type SyncHandle = ( - Option, // since - Receiver>>, // rx -); pub(crate) struct Service<'a> { pub(crate) db: &'static dyn Data, @@ -56,7 +52,6 @@ pub(crate) struct Service<'a> { pub(crate) bad_event_ratelimiter: Arc>>, pub(crate) bad_signature_ratelimiter: Arc, RateLimitState>>>, pub(crate) bad_query_ratelimiter: Arc>>, - pub(crate) sync_receivers: RwLock>, pub(crate) roomid_mutex_insert: RwLock>>>, pub(crate) roomid_mutex_state: RwLock>>>, pub(crate) roomid_mutex_federation: RwLock>>>, // this lock will be held longer @@ -163,7 +158,6 @@ impl Service<'_> { roomid_mutex_federation: RwLock::new(HashMap::new()), roomid_federationhandletime: RwLock::new(HashMap::new()), stateres_mutex: Arc::new(Mutex::new(())), - sync_receivers: RwLock::new(HashMap::new()), rotate: RotationHandler::new(), started: SystemTime::now(), shutdown: AtomicBool::new(false),