Skip to content

Commit

Permalink
Fix Rust DataConsumer (#1262)
Browse files Browse the repository at this point in the history
Co-authored-by: José Luis Millán <[email protected]>
  • Loading branch information
ibc and jmillan authored Dec 12, 2023
1 parent 042b7f5 commit eccc173
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* Fix Rust `DataConsumer` ([PR #1262](https://github.com/versatica/mediasoup/pull/1262)).


### 3.13.10
Expand Down
6 changes: 4 additions & 2 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.subchannels.sort((a, b) => a - b))
.toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down Expand Up @@ -134,7 +135,8 @@ test('dataConsumer.setSubchannels() succeeds', async () =>
{
await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);

expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
expect(dataConsumer1.subchannels.sort((a, b) => a - b))
.toEqual([ 0, 998, 999 ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
Expand Down
31 changes: 15 additions & 16 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl fmt::Debug for RegularDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -411,6 +412,7 @@ impl fmt::Debug for DirectDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -788,6 +790,19 @@ impl DataConsumer {
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(self.id(), DataConsumerSetSubchannelsRequest { subchannels })
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Callback is called when a message has been received from the corresponding data producer.
///
/// # Notes on usage
Expand Down Expand Up @@ -918,22 +933,6 @@ impl DirectDataConsumer {
)
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner
.channel
.request(
self.inner.id,
DataConsumerSetSubchannelsRequest { subchannels },
)
.await?;

*self.inner.subchannels.lock() = response.subchannels;

Ok(())
}
}

/// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent
Expand Down
25 changes: 25 additions & 0 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,31 @@ fn get_stats_succeeds() {
});
}

#[test]
fn set_subchannels() {
future::block_on(async move {
let (_worker, _router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data(DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
))
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([999, 999, 998, 0].to_vec())
.await
.expect("Failed to set data consumer subchannels");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [0, 998, 999]);
});
}

#[test]
fn consume_data_on_direct_transport_succeeds() {
future::block_on(async move {
Expand Down
4 changes: 2 additions & 2 deletions rust/tests/integration/direct_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ fn send_with_subchannels_succeeds() {
}
};

let direct_data_consumer_2 = match &data_consumer_2 {
let _ = match &data_consumer_2 {
DataConsumer::Direct(direct_data_consumer) => direct_data_consumer,
_ => {
panic!("Expected direct data consumer")
Expand Down Expand Up @@ -514,7 +514,7 @@ fn send_with_subchannels_succeeds() {
let mut subchannels = data_consumer_2.subchannels();
subchannels.push(1);

direct_data_consumer_2
data_consumer_2
.set_subchannels(subchannels)
.await
.expect("Failed to set subchannels");
Expand Down

0 comments on commit eccc173

Please sign in to comment.