Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataChannel subchannels #1152

Merged
merged 11 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Event, Notification } from './fbs/notification';
import * as FbsTransport from './fbs/transport';
import * as FbsRequest from './fbs/request';
import * as FbsDataConsumer from './fbs/data-consumer';
import * as utils from './utils';

export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
{
Expand Down Expand Up @@ -45,6 +46,13 @@ export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
*/
paused?: boolean;

/**
* Subchannels this data consumer initially subscribes to.
* Only used in case this data consumer receives messages from a local data
* producer that specifies subchannel(s) when calling send().
*/
subchannels?: number[];

/**
* Custom application data.
*/
Expand Down Expand Up @@ -93,6 +101,7 @@ type DataConsumerDump = DataConsumerData &
id: string;
paused: boolean;
dataProducerPaused: boolean;
subchannels: number[];
};

type DataConsumerInternal = TransportInternal &
Expand Down Expand Up @@ -132,6 +141,9 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
// Associated DataProducer paused flag.
#dataProducerPaused = false;

// Subchannels subscribed to.
#subchannels: number[];

// Custom app data.
#appData: DataConsumerAppData;

Expand All @@ -148,6 +160,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
channel,
paused,
dataProducerPaused,
subchannels,
appData
}:
{
Expand All @@ -156,6 +169,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
channel: Channel;
paused: boolean;
dataProducerPaused: boolean;
subchannels: number[];
appData?: DataConsumerAppData;
}
)
Expand All @@ -169,6 +183,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#channel = channel;
this.#paused = paused;
this.#dataProducerPaused = dataProducerPaused;
this.#subchannels = subchannels;
this.#appData = appData || {} as DataConsumerAppData;

this.handleWorkerNotifications();
Expand Down Expand Up @@ -246,6 +261,14 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return this.#dataProducerPaused;
}

/**
* Get current subchannels this data consumer is subscribed to.
*/
get subchannels(): number[]
{
return Array.from(this.#subchannels);
}

/**
* App custom data.
*/
Expand Down Expand Up @@ -541,6 +564,38 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return data.bufferedAmount();
}

/**
* Set subchannels.
*/
async setSubchannels(subchannels: number[]): Promise<void>
{
logger.debug('setSubchannels()');

console.log('TODO');

const subchannelsSet = new Set(subchannels);

// /* Build Request. */
// const requestOffset = new FbsTransport.SetSubchannelsRequestT(
// this.#internal.dataConsumerId,
// subchannels
// ).pack(this.#channel.bufferBuilder);

// const response = await this.#channel.request(
// FbsRequest.Method.DATACONSUMER_SET_SUBCHANNELS,
// FbsRequest.Body.FBS_DATA_CONSUMER_SetSubchannelsRequest,,
// requestOffset,
// this.#internal.dataConsumerId
// );

// /* Decode Response. */
// const data = new FbsDataConsumer.SetSubchannelsResponse();

// response.body(data);

// return parseSubchannels(data);
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down Expand Up @@ -675,14 +730,14 @@ export function parseDataConsumerDumpResponse(
label : data.label()!,
protocol : data.protocol()!,
paused : data.paused(),
dataProducerPaused : data.dataProducerPaused()

dataProducerPaused : data.dataProducerPaused(),
subchannels : utils.parseVector(data, 'subchannels')
};
}

function parseDataConsumerStats(
binary: FbsDataConsumer.GetStatsResponse
):DataConsumerStat
): DataConsumerStat
{
return {
type : 'data-consumer',
Expand Down
24 changes: 10 additions & 14 deletions node/src/RtpParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import * as flatbuffers from 'flatbuffers';
import {
Boolean as FbsBoolean,
Double as FbsDouble,
Integer as FbsInteger,
IntegerArray as FbsIntegerArray,
Integer32 as FbsInteger32,
Integer32Array as FbsInteger32Array,
String as FbsString,
Parameter as FbsParameter,
RtcpFeedback as FbsRtcpFeedback,
Expand Down Expand Up @@ -564,16 +564,15 @@ export function serializeParameters(
builder, keyOffset, FbsValue.Boolean, value === true ? 1:0
);
}

else if (typeof value === 'number')
{
// Integer.
if (value % 1 === 0)
{
const valueOffset = FbsInteger.createInteger(builder, value);
const valueOffset = FbsInteger32.createInteger32(builder, value);

parameterOffset = FbsParameter.createParameter(
builder, keyOffset, FbsValue.Integer, valueOffset
builder, keyOffset, FbsValue.Integer32, valueOffset
);
}
// Float.
Expand All @@ -586,7 +585,6 @@ export function serializeParameters(
);
}
}

else if (typeof value === 'string')
{
const valueOffset = FbsString.createString(builder, builder.createString(value));
Expand All @@ -595,16 +593,14 @@ export function serializeParameters(
builder, keyOffset, FbsValue.String, valueOffset
);
}

else if (Array.isArray(value))
{
const valueOffset = FbsIntegerArray.createValueVector(builder, value);
const valueOffset = FbsInteger32Array.createValueVector(builder, value);

parameterOffset = FbsParameter.createParameter(
builder, keyOffset, FbsValue.IntegerArray, valueOffset
builder, keyOffset, FbsValue.Integer32Array, valueOffset
);
}

else
{
throw new Error(`invalid parameter type [key:'${key}', value:${value}]`);
Expand Down Expand Up @@ -645,9 +641,9 @@ export function parseParameters(data: any): any
break;
}

case FbsValue.Integer:
case FbsValue.Integer32:
{
const value = new FbsInteger();
const value = new FbsInteger32();

fbsParameter.value(value);

Expand Down Expand Up @@ -678,9 +674,9 @@ export function parseParameters(data: any): any
break;
}

case FbsValue.IntegerArray:
case FbsValue.Integer32Array:
{
const value = new FbsIntegerArray();
const value = new FbsInteger32Array();

fbsParameter.value(value);

Expand Down
16 changes: 13 additions & 3 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ export class Transport
maxPacketLifeTime,
maxRetransmits,
paused = false,
subchannels,
appData
}: DataConsumerOptions<ConsumerAppData>
): Promise<DataConsumer<ConsumerAppData>>
Expand Down Expand Up @@ -1163,7 +1164,8 @@ export class Transport
sctpStreamParameters,
label,
protocol,
paused
paused,
subchannels
});

const response = await this.channel.request(
Expand Down Expand Up @@ -1197,6 +1199,7 @@ export class Transport
},
channel : this.channel,
paused : dump.paused,
subchannels : dump.subchannels,
dataProducerPaused : dump.dataProducerPaused,
appData
});
Expand Down Expand Up @@ -1680,16 +1683,18 @@ function createConsumeDataRequest({
sctpStreamParameters,
label,
protocol,
paused
paused,
subchannels = []
} : {
builder : flatbuffers.Builder;
builder: flatbuffers.Builder;
dataConsumerId: string;
dataProducerId: string;
type: DataConsumerType;
sctpStreamParameters?: SctpStreamParameters;
label: string;
protocol: string;
paused: boolean;
subchannels?: number[];
}): number
{
const dataConsumerIdOffset = builder.createString(dataConsumerId);
Expand All @@ -1707,6 +1712,10 @@ function createConsumeDataRequest({
);
}

const subchannelsOffset = FbsTransport.ConsumeDataRequest.createSubchannelsVector(
builder, subchannels
);

FbsTransport.ConsumeDataRequest.startConsumeDataRequest(builder);
FbsTransport.ConsumeDataRequest.addDataConsumerId(builder, dataConsumerIdOffset);
FbsTransport.ConsumeDataRequest.addDataProducerId(builder, dataProducerIdOffset);
Expand All @@ -1722,6 +1731,7 @@ function createConsumeDataRequest({
FbsTransport.ConsumeDataRequest.addLabel(builder, labelOffset);
FbsTransport.ConsumeDataRequest.addProtocol(builder, protocolOffset);
FbsTransport.ConsumeDataRequest.addPaused(builder, paused);
FbsTransport.ConsumeDataRequest.addSubchannels(builder, subchannelsOffset);

return FbsTransport.ConsumeDataRequest.endConsumeDataRequest(builder);
}
Expand Down
6 changes: 5 additions & 1 deletion node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ test('transport.consumeData() succeeds', async () =>
{
dataProducerId : dataProducer.id,
maxPacketLifeTime : 4000,
appData : { baz: 'LOL' }
appData : { baz: 'LOL' },
// Valid values are 0...65535 so others and duplicated ones will be
// discarded.
subchannels : [ 0, 1, 1, 1, 2, 65535, 65536, 65537, 100 ]
});

expect(onObserverNewDataConsumer).toHaveBeenCalledTimes(1);
Expand All @@ -70,6 +73,7 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down
23 changes: 17 additions & 6 deletions node/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@ import { ProducerType } from './Producer';
import { Type as FbsRtpParametersType } from './fbs/rtp-parameters';

/**
* Clones the given object/array.
* Clones the given value.
*/
export function clone(data: any): any
export function clone<T>(value: T): T
{
if (typeof data !== 'object')
if (value === undefined)
{
return {};
return undefined as unknown as T;
}
else if (Number.isNaN(value))
{
return NaN as unknown as T;
}
else if (typeof structuredClone === 'function')
{
// Available in Node >= 18.
return structuredClone(value);
}
else
{
return JSON.parse(JSON.stringify(value));
}

return JSON.parse(JSON.stringify(data));
}

/**
Expand Down
1 change: 1 addition & 0 deletions worker/fbs/dataConsumer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ table DumpResponse {
protocol:string (required);
paused:bool;
data_producer_paused:bool;
subchannels:[uint16];
}

table GetStatsResponse {
Expand Down
8 changes: 4 additions & 4 deletions worker/fbs/rtpParameters.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ table Boolean {
value:uint8;
}

table Integer {
table Integer32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even Int32 could make it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, please. If so I should also rename Integer32Array to Int32Array. However Int32Array is a JS binary array class so conflicts.

value:int32;
}

table IntegerArray {
table Integer32Array {
value:[int32];
}

Expand All @@ -27,10 +27,10 @@ table String {

union Value {
Boolean,
Integer,
Integer32,
Double,
String,
IntegerArray,
Integer32Array,
}

table Parameter {
Expand Down
1 change: 1 addition & 0 deletions worker/fbs/transport.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ table ConsumeDataRequest {
label:string;
protocol:string;
paused:bool = false;
subchannels:[uint16];
}

table Tuple {
Expand Down
2 changes: 2 additions & 0 deletions worker/include/RTC/DataConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Channel/ChannelSocket.hpp"
#include "RTC/SctpDictionaries.hpp"
#include "RTC/Shared.hpp"
#include <absl/container/flat_hash_set.h>
#include <string>

namespace RTC
Expand Down Expand Up @@ -120,6 +121,7 @@ namespace RTC
RTC::SctpStreamParameters sctpStreamParameters;
std::string label;
std::string protocol;
absl::flat_hash_set<uint16_t> subchannels;
bool transportConnected{ false };
bool sctpAssociationConnected{ false };
bool paused{ false };
Expand Down
Loading