Skip to content

Commit

Permalink
wip filestream reader
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Dec 10, 2024
1 parent f95e507 commit 55e0ee1
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 2 deletions.
84 changes: 84 additions & 0 deletions livekit/src/room/data_streams/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use livekit_runtime::Stream;
use tokio::sync::mpsc;

pub struct DataStreamChunk {
pub stream_id: String,
pub chunk_index: u64,
pub content: Vec<u8>,
pub complete: bool,
pub version: i32,
}

pub struct FileStreamInfo {
pub stream_id: String,
pub timestamp: i64,
pub topic: String,
pub mime_type: String,
pub total_length: Option<u64>,
pub total_chunks: Option<u64>,
}

pub struct FileStreamReader {
update_rx: mpsc::UnboundedReceiver<DataStreamChunk>,
pub info: FileStreamInfo,
is_closed: bool,
}

impl FileStreamReader {
pub fn new(info: FileStreamInfo) -> (Self, FileStreamUpdater) {
let (update_tx, update_rx) = mpsc::unbounded_channel();
(Self { update_rx, info, is_closed: false }, FileStreamUpdater { update_tx })
}

fn close(&mut self) {
self.is_closed = true;
self.update_rx.close();
}
}

impl Drop for FileStreamReader {
fn drop(&mut self) {
self.close();
}
}

impl Stream for FileStreamReader {
type Item = Vec<u8>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_closed {
return Poll::Ready(None); // Stream is closed‚, stop yielding updates
}
match self.update_rx.poll_recv(cx) {
Poll::Ready(Some(update)) => {
if update.complete {
Poll::Ready(None) // Close stream after receiving a complete update
} else {
Poll::Ready(Some(update.content)) // Continue with data updates
}
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

/// Helper to send updates to the `FileStream`.
pub struct FileStreamUpdater {
update_tx: mpsc::UnboundedSender<DataStreamChunk>,
}

impl FileStreamUpdater {
/// Sends an update to the `FileStream`.
pub fn send_update(
&self,
data: DataStreamChunk,
) -> Result<(), mpsc::error::SendError<DataStreamChunk>> {
self.update_tx.send(data)
}
}
26 changes: 25 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};

use data_streams::{DataStreamChunk, FileStreamInfo, FileStreamUpdater};
use futures_util::StreamExt;
use libwebrtc::{
native::frame_cryptor::EncryptionState,
prelude::{
Expand Down Expand Up @@ -50,6 +52,7 @@ use crate::{
},
};

pub mod data_streams;
pub mod e2ee;
pub mod id;
pub mod options;
Expand Down Expand Up @@ -365,6 +368,7 @@ pub(crate) struct RoomSession {
remote_participants: RwLock<HashMap<ParticipantIdentity, RemoteParticipant>>,
e2ee_manager: E2eeManager,
room_task: AsyncMutex<Option<(JoinHandle<()>, oneshot::Sender<()>)>>,
file_streams: HashMap<String, FileStreamUpdater>,
}

impl Debug for RoomSession {
Expand Down Expand Up @@ -506,6 +510,7 @@ impl Room {
dispatcher: dispatcher.clone(),
e2ee_manager: e2ee_manager.clone(),
room_task: Default::default(),
file_streams: HashMap::new(),
});

e2ee_manager.on_state_changed({
Expand Down Expand Up @@ -1258,6 +1263,18 @@ impl RoomSession {
total_length: Option<u64>,
total_chunks: Option<u64>,
) {
let (mut stream_reader, updater) = data_streams::FileStreamReader::new(FileStreamInfo {
stream_id,
timestamp,
topic,
mime_type,
total_length,
total_chunks,
});

self.file_streams.insert(stream_reader.info.stream_id.clone(), updater);

let _ = self.dispatcher.dispatch(RoomEvent::FileStreamReceived { stream_reader });

// create and store readable stream
// emit event with readable stream and info
Expand All @@ -1271,7 +1288,14 @@ impl RoomSession {
complete: bool,
version: i32,
) {
// update readable stream
let file_updater = self.file_streams.get(&stream_id).unwrap();
let _ = file_updater.send_update(DataStreamChunk {
stream_id,
chunk_index,
content,
complete,
version,
});
}

/// Create a new participant
Expand Down
1 change: 0 additions & 1 deletion livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use tokio::sync::{
mpsc, oneshot, Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock,
RwLockReadGuard as AsyncRwLockReadGuard,
};
use tokio_stream::StreamExt;

pub use self::rtc_session::SessionStats;
use crate::prelude::ParticipantIdentity;
Expand Down

0 comments on commit 55e0ee1

Please sign in to comment.