Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataConsumer: Add addSubchannel() and removeSubchannel() #1263

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

### NEXT

* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* liburing: Avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: Use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* Fix Rust `DataConsumer` ([PR #1262](https://github.com/versatica/mediasoup/pull/1262)).
* `DataConsumer`: Add `addSubchannel()` and `removeSubchannel()` methods ([PR #1263](https://github.com/versatica/mediasoup/pull/1263)).


### 3.13.10
Expand Down
55 changes: 55 additions & 0 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,61 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#subchannels = utils.parseVector(data, 'subchannels');
}

/**
* Add a subchannel.
*/
async addSubchannel(subchannel: number): Promise<void>
{
logger.debug('addSubchannel()');

/* Build Request. */
const requestOffset =
FbsDataConsumer.AddSubchannelRequest.createAddSubchannelRequest(
this.#channel.bufferBuilder, subchannel);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_ADD_SUBCHANNEL,
FbsRequest.Body.DataConsumer_AddSubchannelRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.AddSubchannelResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

/**
* Remove a subchannel.
*/
async removeSubchannel(subchannel: number): Promise<void>
{
logger.debug('removeSubchannel()');

/* Build Request. */
const requestOffset = FbsDataConsumer.RemoveSubchannelRequest.
createRemoveSubchannelRequest(this.#channel.bufferBuilder, subchannel);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_REMOVE_SUBCHANNEL,
FbsRequest.Body.DataConsumer_RemoveSubchannelRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.RemoveSubchannelResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down
24 changes: 24 additions & 0 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ test('dataConsumer.setSubchannels() succeeds', async () =>
.toEqual([ 0, 998, 999 ]);
}, 2000);

test('dataConsumer.addSubchannel() and .removeSubchannel() succeed', async () =>
{
await dataConsumer1.setSubchannels([ ]);
expect(dataConsumer1.subchannels).toEqual([ ]);

await dataConsumer1.addSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5 ]);

await dataConsumer1.addSubchannel(10);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.addSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.removeSubchannel(666);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.removeSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 10 ]);

await dataConsumer1.setSubchannels([ ]);
expect(dataConsumer1.subchannels).toEqual([ ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
{
const onObserverNewDataConsumer = jest.fn();
Expand Down
100 changes: 100 additions & 0 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3260,6 +3260,106 @@ impl Request for DataConsumerSetSubchannelsRequest {
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerAddSubchannelRequest {
pub(crate) subchannel: u16,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerAddSubchannelResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerAddSubchannelRequest {
const METHOD: request::Method = request::Method::DataconsumerAddSubchannel;
type HandlerId = DataConsumerId;
type Response = DataConsumerAddSubchannelResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::AddSubchannelRequest::create(&mut builder, self.subchannel);
let request_body =
request::Body::create_data_consumer_add_subchannel_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::BodyRef::DataConsumerAddSubchannelResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

let data = data_consumer::AddSubchannelResponse::try_from(data)?;

Ok(DataConsumerAddSubchannelResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerRemoveSubchannelRequest {
pub(crate) subchannel: u16,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerRemoveSubchannelResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerRemoveSubchannelRequest {
const METHOD: request::Method = request::Method::DataconsumerRemoveSubchannel;
type HandlerId = DataConsumerId;
type Response = DataConsumerRemoveSubchannelResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::RemoveSubchannelRequest::create(&mut builder, self.subchannel);
let request_body =
request::Body::create_data_consumer_remove_subchannel_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::BodyRef::DataConsumerRemoveSubchannelResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

let data = data_consumer::RemoveSubchannelResponse::try_from(data)?;

Ok(DataConsumerRemoveSubchannelResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug)]
pub(crate) struct RtpObserverCloseRequest {
pub(crate) rtp_observer_id: RtpObserverId,
Expand Down
37 changes: 33 additions & 4 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ mod tests;
use crate::data_producer::{DataProducer, DataProducerId, WeakDataProducer};
use crate::data_structures::{AppData, WebRtcMessage};
use crate::messages::{
DataConsumerCloseRequest, DataConsumerDumpRequest, DataConsumerGetBufferedAmountRequest,
DataConsumerGetStatsRequest, DataConsumerPauseRequest, DataConsumerResumeRequest,
DataConsumerSendRequest, DataConsumerSetBufferedAmountLowThresholdRequest,
DataConsumerSetSubchannelsRequest,
DataConsumerAddSubchannelRequest, DataConsumerCloseRequest, DataConsumerDumpRequest,
DataConsumerGetBufferedAmountRequest, DataConsumerGetStatsRequest, DataConsumerPauseRequest,
DataConsumerRemoveSubchannelRequest, DataConsumerResumeRequest, DataConsumerSendRequest,
DataConsumerSetBufferedAmountLowThresholdRequest, DataConsumerSetSubchannelsRequest,
};
use crate::sctp_parameters::SctpStreamParameters;
use crate::transport::Transport;
Expand Down Expand Up @@ -803,6 +803,35 @@ impl DataConsumer {
Ok(())
}

/// Adds a subchannel to the worker DataConsumer.
pub async fn add_subchannel(&self, subchannel: u16) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(self.id(), DataConsumerAddSubchannelRequest { subchannel })
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Removes a subchannel to the worker DataConsumer.
pub async fn remove_subchannel(&self, subchannel: u16) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(
self.id(),
DataConsumerRemoveSubchannelRequest { subchannel },
)
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Callback is called when a message has been received from the corresponding data producer.
///
/// # Notes on usage
Expand Down
86 changes: 86 additions & 0 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,92 @@ fn set_subchannels() {
});
}

#[test]
fn add_and_remove_subchannel() {
future::block_on(async move {
let (_worker, _router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
))
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), []);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

assert_eq!(data_consumer.subchannels(), [5]);

data_consumer
.add_subchannel(10)
.await
.expect("Failed to add data consumer subchannel");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.remove_subchannel(666)
.await
.expect("Failed to remove data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.remove_subchannel(5)
.await
.expect("Failed to remove data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [10]);

data_consumer
.add_subchannel(5)
.await
.expect("Failed to add data consumer subchannel");

sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [5, 10]);

data_consumer
.set_subchannels([].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), []);
});
}

#[test]
fn consume_data_on_direct_transport_succeeds() {
future::block_on(async move {
Expand Down
16 changes: 16 additions & 0 deletions worker/fbs/dataConsumer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ table SetSubchannelsResponse {
subchannels: [uint16] (required);
}

table AddSubchannelRequest {
subchannel: uint16;
}

table AddSubchannelResponse {
subchannels: [uint16] (required);
}

table RemoveSubchannelRequest {
subchannel: uint16;
}

table RemoveSubchannelResponse {
subchannels: [uint16] (required);
}

// Notifications from Worker.

table BufferedAmountLowNotification {
Expand Down
4 changes: 4 additions & 0 deletions worker/fbs/request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ enum Method: uint8 {
DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD,
DATACONSUMER_SEND,
DATACONSUMER_SET_SUBCHANNELS,
DATACONSUMER_ADD_SUBCHANNEL,
DATACONSUMER_REMOVE_SUBCHANNEL,
RTPOBSERVER_PAUSE,
RTPOBSERVER_RESUME,
RTPOBSERVER_ADD_PRODUCER,
Expand Down Expand Up @@ -114,6 +116,8 @@ union Body {
DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest,
DataConsumer_SendRequest: FBS.DataConsumer.SendRequest,
DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest,
DataConsumer_AddSubchannelRequest: FBS.DataConsumer.AddSubchannelRequest,
DataConsumer_RemoveSubchannelRequest: FBS.DataConsumer.RemoveSubchannelRequest,
RtpObserver_AddProducerRequest: FBS.RtpObserver.AddProducerRequest,
RtpObserver_RemoveProducerRequest: FBS.RtpObserver.RemoveProducerRequest,
}
Expand Down
Loading