Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap receiver in broadcast so that receiver can be called multiple times #2411

Merged
merged 10 commits into from
Dec 13, 2024
1 change: 1 addition & 0 deletions app/lib/features/cross_signing/widgets/cross_signing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ final _log = Logger('a3::cross_signing::widget');

// this widget has no elements
// it just pops up stage dialogs for verification
@immutable
class CrossSigning extends ConsumerStatefulWidget {
const CrossSigning({super.key});

Expand Down
2 changes: 1 addition & 1 deletion app/lib/features/main/app_shell.dart
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class AppShellState extends ConsumerState<AppShell> {
controller: screenshotController,
child: Column(
children: [
const CrossSigning(),
CrossSigning(),
Expanded(
child: buildBody(context),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import 'package:riverpod/riverpod.dart';
final _log = Logger('a3::settings::devices_notifier');

class AsyncDevicesNotifier extends AsyncNotifier<List<DeviceRecord>> {
Stream<DeviceEvent>? _listener;
StreamSubscription<DeviceEvent>? _poller;
late Stream<DeviceEvent> _listener;
late StreamSubscription<DeviceEvent> _poller;

Future<List<DeviceRecord>> _getSessions(SessionManager manager) async {
return (await manager.allSessions()).toList();
Expand All @@ -22,7 +22,7 @@ class AsyncDevicesNotifier extends AsyncNotifier<List<DeviceRecord>> {
final manager = client.sessionManager();

_listener = client.deviceEventRx();
_poller = _listener?.listen(
_poller = _listener.listen(
(data) async {
state = await AsyncValue.guard(() async => await _getSessions(manager));
},
Expand All @@ -33,7 +33,7 @@ class AsyncDevicesNotifier extends AsyncNotifier<List<DeviceRecord>> {
_log.info('stream ended');
},
);
ref.onDispose(() => _poller?.cancel());
ref.onDispose(() => _poller.cancel());

return await _getSessions(manager);
}
Expand Down
4 changes: 2 additions & 2 deletions native/acter/api.rsh
Original file line number Diff line number Diff line change
Expand Up @@ -2803,7 +2803,7 @@ object Client {
fn logout() -> Future<Result<bool>>;

/// Get the verification event receiver
fn verification_event_rx() -> Option<Stream<VerificationEvent>>;
fn verification_event_rx() -> Stream<VerificationEvent>;

/// Get session manager that returns all/verified/unverified/inactive session list
fn session_manager() -> SessionManager;
Expand All @@ -2819,7 +2819,7 @@ object Client {
fn install_sas_event_handler(flow_id: string) -> Future<Result<bool>>;

/// Return the event handler that new device was found or existing device was changed
fn device_event_rx() -> Option<Stream<DeviceEvent>>;
fn device_event_rx() -> Stream<DeviceEvent>;

/// Return the typing event receiver
fn subscribe_to_typing_event_stream(room_id: string) -> Stream<TypingEvent>;
Expand Down
25 changes: 13 additions & 12 deletions native/acter/src/api/device.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use anyhow::{Context, Result};
use futures::{
channel::mpsc::{channel, Receiver, Sender},
pin_mut,
stream::StreamExt,
stream::{Stream, StreamExt},
};
use matrix_sdk::{executor::JoinHandle, Client as SdkClient};
use matrix_sdk_base::ruma::{OwnedDeviceId, OwnedUserId};
use std::{
marker::Unpin,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::Mutex;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{error, info};

use super::{client::Client, common::DeviceRecord, RUNTIME};

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct DeviceEvent {
new_devices: Vec<OwnedDeviceId>,
changed_devices: Vec<OwnedDeviceId>,
Expand Down Expand Up @@ -50,7 +51,7 @@
#[derive(Clone, Debug)]
pub(crate) struct DeviceController {
event_tx: Sender<DeviceEvent>, // keep it resident in memory
event_rx: Arc<Mutex<Option<Receiver<DeviceEvent>>>>,
event_rx: Arc<Receiver<DeviceEvent>>,
listener: Arc<JoinHandle<()>>, // keep it resident in memory
}

Expand Down Expand Up @@ -89,7 +90,7 @@
}
if !new_devices.is_empty() || !changed_devices.is_empty() {
let evt = DeviceEvent::new(new_devices, changed_devices);
if let Err(e) = tx.try_send(evt) {
if let Err(e) = tx.send(evt) {
error!("Dropping device event: {}", e);
}
}
Expand All @@ -98,18 +99,18 @@

DeviceController {
event_tx,
event_rx: Arc::new(Mutex::new(Some(event_rx))),
event_rx: Arc::new(event_rx),
listener: Arc::new(listener),
}
}
}

impl Client {
pub fn device_event_rx(&self) -> Option<Receiver<DeviceEvent>> {
match self.device_controller.event_rx.try_lock() {
Ok(mut r) => r.take(),
Err(e) => None,
}
// this return value should be Unpin, because next() of this stream is called in interactive_verification_started_from_request
// this return value should be wrapped in Box::pin, to make unpin possible
pub fn device_event_rx(&self) -> impl Stream<Item = DeviceEvent> + Unpin {
let mut stream = BroadcastStream::new(self.device_controller.event_rx.resubscribe());
Box::pin(stream.filter_map(|o| async move { o.ok() }))

Check warning on line 113 in native/acter/src/api/device.rs

View check run for this annotation

Codecov / codecov/patch

native/acter/src/api/device.rs#L111-L113

Added lines #L111 - L113 were not covered by tests
}

pub async fn device_records(&self, verified: bool) -> Result<Vec<DeviceRecord>> {
Expand Down
Loading
Loading