Skip to content

Commit

Permalink
DataChannel subchannels (#1152)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc authored Sep 11, 2023
1 parent 7651f97 commit 2798355
Show file tree
Hide file tree
Showing 34 changed files with 494 additions and 93 deletions.
57 changes: 54 additions & 3 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Event, Notification } from './fbs/notification';
import * as FbsTransport from './fbs/transport';
import * as FbsRequest from './fbs/request';
import * as FbsDataConsumer from './fbs/data-consumer';
import * as utils from './utils';

export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
{
Expand Down Expand Up @@ -45,6 +46,13 @@ export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
*/
paused?: boolean;

/**
* Subchannels this data consumer initially subscribes to.
* Only used in case this data consumer receives messages from a local data
* producer that specifies subchannel(s) when calling send().
*/
subchannels?: number[];

/**
* Custom application data.
*/
Expand Down Expand Up @@ -93,6 +101,7 @@ type DataConsumerDump = DataConsumerData &
id: string;
paused: boolean;
dataProducerPaused: boolean;
subchannels: number[];
};

type DataConsumerInternal = TransportInternal &
Expand Down Expand Up @@ -132,6 +141,9 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
// Associated DataProducer paused flag.
#dataProducerPaused = false;

// Subchannels subscribed to.
#subchannels: number[];

// Custom app data.
#appData: DataConsumerAppData;

Expand All @@ -148,6 +160,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
channel,
paused,
dataProducerPaused,
subchannels,
appData
}:
{
Expand All @@ -156,6 +169,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
channel: Channel;
paused: boolean;
dataProducerPaused: boolean;
subchannels: number[];
appData?: DataConsumerAppData;
}
)
Expand All @@ -169,6 +183,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#channel = channel;
this.#paused = paused;
this.#dataProducerPaused = dataProducerPaused;
this.#subchannels = subchannels;
this.#appData = appData || {} as DataConsumerAppData;

this.handleWorkerNotifications();
Expand Down Expand Up @@ -246,6 +261,14 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return this.#dataProducerPaused;
}

/**
* Get current subchannels this data consumer is subscribed to.
*/
get subchannels(): number[]
{
return Array.from(this.#subchannels);
}

/**
* App custom data.
*/
Expand Down Expand Up @@ -541,6 +564,34 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return data.bufferedAmount();
}

/**
* Set subchannels.
*/
async setSubchannels(subchannels: number[]): Promise<void>
{
logger.debug('setSubchannels()');

/* Build Request. */
const requestOffset = new FbsDataConsumer.SetSubchannelsRequestT(
subchannels
).pack(this.#channel.bufferBuilder);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_SET_SUBCHANNELS,
FbsRequest.Body.FBS_DataConsumer_SetSubchannelsRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.SetSubchannelsResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down Expand Up @@ -675,14 +726,14 @@ export function parseDataConsumerDumpResponse(
label : data.label()!,
protocol : data.protocol()!,
paused : data.paused(),
dataProducerPaused : data.dataProducerPaused()

dataProducerPaused : data.dataProducerPaused(),
subchannels : utils.parseVector(data, 'subchannels')
};
}

function parseDataConsumerStats(
binary: FbsDataConsumer.GetStatsResponse
):DataConsumerStat
): DataConsumerStat
{
return {
type : 'data-consumer',
Expand Down
15 changes: 13 additions & 2 deletions node/src/DataProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,12 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>
/**
* Send data (just valid for DataProducers created on a DirectTransport).
*/
send(message: string | Buffer, ppid?: number): void
send(
message: string | Buffer,
ppid?: number,
subchannels?: number[],
requiredSubchannel?: number
): void
{
if (typeof message !== 'string' && !Buffer.isBuffer(message))
{
Expand Down Expand Up @@ -431,6 +436,10 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>

let dataOffset = 0;

const subchannelsOffset = FbsDataProducer.SendNotification.createSubchannelsVector(
builder, subchannels ?? []
);

if (typeof message === 'string')
{
const messageOffset = builder.createString(message);
Expand All @@ -450,7 +459,9 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>
typeof message === 'string' ?
FbsDataProducer.Data.String :
FbsDataProducer.Data.Binary,
dataOffset
dataOffset,
subchannelsOffset,
requiredSubchannel ?? null
);

this.#channel.notify(
Expand Down
24 changes: 10 additions & 14 deletions node/src/RtpParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import * as flatbuffers from 'flatbuffers';
import {
Boolean as FbsBoolean,
Double as FbsDouble,
Integer as FbsInteger,
IntegerArray as FbsIntegerArray,
Integer32 as FbsInteger32,
Integer32Array as FbsInteger32Array,
String as FbsString,
Parameter as FbsParameter,
RtcpFeedback as FbsRtcpFeedback,
Expand Down Expand Up @@ -564,16 +564,15 @@ export function serializeParameters(
builder, keyOffset, FbsValue.Boolean, value === true ? 1:0
);
}

else if (typeof value === 'number')
{
// Integer.
if (value % 1 === 0)
{
const valueOffset = FbsInteger.createInteger(builder, value);
const valueOffset = FbsInteger32.createInteger32(builder, value);

parameterOffset = FbsParameter.createParameter(
builder, keyOffset, FbsValue.Integer, valueOffset
builder, keyOffset, FbsValue.Integer32, valueOffset
);
}
// Float.
Expand All @@ -586,7 +585,6 @@ export function serializeParameters(
);
}
}

else if (typeof value === 'string')
{
const valueOffset = FbsString.createString(builder, builder.createString(value));
Expand All @@ -595,16 +593,14 @@ export function serializeParameters(
builder, keyOffset, FbsValue.String, valueOffset
);
}

else if (Array.isArray(value))
{
const valueOffset = FbsIntegerArray.createValueVector(builder, value);
const valueOffset = FbsInteger32Array.createValueVector(builder, value);

parameterOffset = FbsParameter.createParameter(
builder, keyOffset, FbsValue.IntegerArray, valueOffset
builder, keyOffset, FbsValue.Integer32Array, valueOffset
);
}

else
{
throw new Error(`invalid parameter type [key:'${key}', value:${value}]`);
Expand Down Expand Up @@ -645,9 +641,9 @@ export function parseParameters(data: any): any
break;
}

case FbsValue.Integer:
case FbsValue.Integer32:
{
const value = new FbsInteger();
const value = new FbsInteger32();

fbsParameter.value(value);

Expand Down Expand Up @@ -678,9 +674,9 @@ export function parseParameters(data: any): any
break;
}

case FbsValue.IntegerArray:
case FbsValue.Integer32Array:
{
const value = new FbsIntegerArray();
const value = new FbsInteger32Array();

fbsParameter.value(value);

Expand Down
16 changes: 13 additions & 3 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ export class Transport
maxPacketLifeTime,
maxRetransmits,
paused = false,
subchannels,
appData
}: DataConsumerOptions<ConsumerAppData>
): Promise<DataConsumer<ConsumerAppData>>
Expand Down Expand Up @@ -1163,7 +1164,8 @@ export class Transport
sctpStreamParameters,
label,
protocol,
paused
paused,
subchannels
});

const response = await this.channel.request(
Expand Down Expand Up @@ -1197,6 +1199,7 @@ export class Transport
},
channel : this.channel,
paused : dump.paused,
subchannels : dump.subchannels,
dataProducerPaused : dump.dataProducerPaused,
appData
});
Expand Down Expand Up @@ -1680,16 +1683,18 @@ function createConsumeDataRequest({
sctpStreamParameters,
label,
protocol,
paused
paused,
subchannels = []
} : {
builder : flatbuffers.Builder;
builder: flatbuffers.Builder;
dataConsumerId: string;
dataProducerId: string;
type: DataConsumerType;
sctpStreamParameters?: SctpStreamParameters;
label: string;
protocol: string;
paused: boolean;
subchannels?: number[];
}): number
{
const dataConsumerIdOffset = builder.createString(dataConsumerId);
Expand All @@ -1707,6 +1712,10 @@ function createConsumeDataRequest({
);
}

const subchannelsOffset = FbsTransport.ConsumeDataRequest.createSubchannelsVector(
builder, subchannels
);

FbsTransport.ConsumeDataRequest.startConsumeDataRequest(builder);
FbsTransport.ConsumeDataRequest.addDataConsumerId(builder, dataConsumerIdOffset);
FbsTransport.ConsumeDataRequest.addDataProducerId(builder, dataProducerIdOffset);
Expand All @@ -1722,6 +1731,7 @@ function createConsumeDataRequest({
FbsTransport.ConsumeDataRequest.addLabel(builder, labelOffset);
FbsTransport.ConsumeDataRequest.addProtocol(builder, protocolOffset);
FbsTransport.ConsumeDataRequest.addPaused(builder, paused);
FbsTransport.ConsumeDataRequest.addSubchannels(builder, subchannelsOffset);

return FbsTransport.ConsumeDataRequest.endConsumeDataRequest(builder);
}
Expand Down
11 changes: 11 additions & 0 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ test('transport.consumeData() succeeds', async () =>
{
dataProducerId : dataProducer.id,
maxPacketLifeTime : 4000,
// Valid values are 0...65535 so others and duplicated ones will be
// discarded.
subchannels : [ 0, 1, 1, 1, 2, 65535, 65536, 65537, 100 ],
appData : { baz: 'LOL' }
});

Expand All @@ -70,6 +73,7 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down Expand Up @@ -128,6 +132,13 @@ test('dataConsumer.getStats() succeeds', async () =>
]);
}, 2000);

test('dataConsumer.setSubchannels() succeeds', async () =>
{
await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);

expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
{
const onObserverNewDataConsumer = jest.fn();
Expand Down
Loading

0 comments on commit 2798355

Please sign in to comment.