Skip to content

Commit

Permalink
Merge pull request #1592 from canalplus/misc/mandatory-clear-signal
Browse files Browse the repository at this point in the history
[Proposal] Make SharedReference's clearSignal param mandatory
  • Loading branch information
peaBerberian authored Nov 15, 2024
2 parents e5b50b6 + 92aae7e commit 0a66b5c
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 99 deletions.
35 changes: 19 additions & 16 deletions src/core/main/worker/worker_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,23 +523,26 @@ function loadOrReloadPreparedContent(
segmentQueueCreator,
} = preparedContent;
const { drmSystemId, enableFastSwitching, initialTime, onCodecSwitch } = val;
playbackObservationRef.onUpdate((observation) => {
if (preparedContent.decipherabilityFreezeDetector.needToReload(observation)) {
handleMediaSourceReload({
timeOffset: 0,
minimumPosition: 0,
maximumPosition: Infinity,
});
}

// Synchronize SegmentSinks with what has been buffered.
["video" as const, "audio" as const, "text" as const].forEach((tType) => {
const segmentSinkStatus = segmentSinksStore.getStatus(tType);
if (segmentSinkStatus.type === "initialized") {
segmentSinkStatus.value.synchronizeInventory(observation.buffered[tType] ?? []);
playbackObservationRef.onUpdate(
(observation) => {
if (preparedContent.decipherabilityFreezeDetector.needToReload(observation)) {
handleMediaSourceReload({
timeOffset: 0,
minimumPosition: 0,
maximumPosition: Infinity,
});
}
});
});

// Synchronize SegmentSinks with what has been buffered.
["video" as const, "audio" as const, "text" as const].forEach((tType) => {
const segmentSinkStatus = segmentSinksStore.getStatus(tType);
if (segmentSinkStatus.type === "initialized") {
segmentSinkStatus.value.synchronizeInventory(observation.buffered[tType] ?? []);
}
});
},
{ clearSignal: currentLoadCanceller.signal },
);

const initialPeriod =
manifest.getPeriodForTime(initialTime) ?? manifest.getNextPeriod(initialTime);
Expand Down
82 changes: 44 additions & 38 deletions src/main_thread/init/multi_thread_content_initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -901,46 +901,52 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
const ref = new SharedReference<IAdaptationChoice | null | undefined>(
undefined,
);
ref.onUpdate((adapChoice) => {
if (this._currentContentInfo === null) {
ref.finish();
return;
}
if (!isNullOrUndefined(adapChoice)) {
adapChoice.representations.onUpdate((repChoice, stopListening) => {
if (this._currentContentInfo === null) {
stopListening();
return;
}
sendMessage(this._settings.worker, {
type: MainThreadMessageType.RepresentationUpdate,
contentId: this._currentContentInfo.contentId,
value: {
periodId: msgData.value.periodId,
adaptationId: adapChoice.adaptationId,
bufferType: msgData.value.bufferType,
choice: repChoice,
ref.onUpdate(
(adapChoice) => {
if (this._currentContentInfo === null) {
ref.finish();
return;
}
if (!isNullOrUndefined(adapChoice)) {
adapChoice.representations.onUpdate(
(repChoice, stopListening) => {
if (this._currentContentInfo === null) {
stopListening();
return;
}
sendMessage(this._settings.worker, {
type: MainThreadMessageType.RepresentationUpdate,
contentId: this._currentContentInfo.contentId,
value: {
periodId: msgData.value.periodId,
adaptationId: adapChoice.adaptationId,
bufferType: msgData.value.bufferType,
choice: repChoice,
},
});
},
});
{ clearSignal: this._initCanceller.signal },
);
}
sendMessage(this._settings.worker, {
type: MainThreadMessageType.TrackUpdate,
contentId: this._currentContentInfo.contentId,
value: {
periodId: msgData.value.periodId,
bufferType: msgData.value.bufferType,
choice: isNullOrUndefined(adapChoice)
? adapChoice
: {
adaptationId: adapChoice.adaptationId,
switchingMode: adapChoice.switchingMode,
initialRepresentations: adapChoice.representations.getValue(),
relativeResumingPosition: adapChoice.relativeResumingPosition,
},
},
});
}
sendMessage(this._settings.worker, {
type: MainThreadMessageType.TrackUpdate,
contentId: this._currentContentInfo.contentId,
value: {
periodId: msgData.value.periodId,
bufferType: msgData.value.bufferType,
choice: isNullOrUndefined(adapChoice)
? adapChoice
: {
adaptationId: adapChoice.adaptationId,
switchingMode: adapChoice.switchingMode,
initialRepresentations: adapChoice.representations.getValue(),
relativeResumingPosition: adapChoice.relativeResumingPosition,
},
},
});
});
},
{ clearSignal: this._initCanceller.signal },
);
this.trigger("periodStreamReady", {
period,
type: msgData.value.bufferType,
Expand Down
12 changes: 6 additions & 6 deletions src/playback_observer/media_element_playback_observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,25 +258,25 @@ export default class PlaybackObserver {
/**
* Register a callback so it regularly receives playback observations.
* @param {Function} cb
* @param {Object} options - Configuration options:
* @param {Object} params - Configuration parameters:
* - `includeLastObservation`: If set to `true` the last observation will
* be first emitted synchronously.
* - `clearSignal`: If set, the callback will be unregistered when this
* CancellationSignal emits.
*/
public listen(
cb: (observation: IPlaybackObservation, stopListening: () => void) => void,
options?: {
params: {
includeLastObservation?: boolean | undefined;
clearSignal?: CancellationSignal | undefined;
clearSignal: CancellationSignal;
},
) {
if (this._canceller.isUsed() || options?.clearSignal?.isCancelled() === true) {
if (this._canceller.isUsed() || params.clearSignal.isCancelled()) {
return noop;
}
this._observationRef.onUpdate(cb, {
clearSignal: options?.clearSignal,
emitCurrentValue: options?.includeLastObservation,
clearSignal: params.clearSignal,
emitCurrentValue: params.includeLastObservation,
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/playback_observer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ export interface IReadOnlyPlaybackObserver<TObservationType> {
*/
listen(
cb: (observation: TObservationType, stopListening: () => void) => void,
options?: {
options: {
includeLastObservation?: boolean | undefined;
clearSignal?: CancellationSignal | undefined;
clearSignal: CancellationSignal;
},
): void;
/**
Expand Down
13 changes: 5 additions & 8 deletions src/playback_observer/utils/generate_read_only_observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,17 @@ export default function generateReadOnlyObserver<TSource, TDest>(
},
listen(
cb: (observation: TDest, stopListening: () => void) => void,
options?: {
params: {
includeLastObservation?: boolean | undefined;
clearSignal?: CancellationSignal | undefined;
clearSignal: CancellationSignal;
},
): void {
if (
cancellationSignal.isCancelled() ||
options?.clearSignal?.isCancelled() === true
) {
if (cancellationSignal.isCancelled() || params.clearSignal.isCancelled()) {
return;
}
mappedRef.onUpdate(cb, {
clearSignal: options?.clearSignal,
emitCurrentValue: options?.includeLastObservation,
clearSignal: params.clearSignal,
emitCurrentValue: params.includeLastObservation,
});
},
deriveReadOnlyObserver<TNext>(
Expand Down
13 changes: 5 additions & 8 deletions src/playback_observer/worker_playback_observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,18 @@ export default class WorkerPlaybackObserver

public listen(
cb: (observation: IWorkerPlaybackObservation, stopListening: () => void) => void,
options?: {
params: {
includeLastObservation?: boolean | undefined;
clearSignal?: CancellationSignal | undefined;
clearSignal: CancellationSignal;
},
): void {
if (
this._cancelSignal.isCancelled() ||
options?.clearSignal?.isCancelled() === true
) {
if (this._cancelSignal.isCancelled() || params.clearSignal.isCancelled()) {
return;
}

this._src.onUpdate(cb, {
clearSignal: options?.clearSignal,
emitCurrentValue: options?.includeLastObservation,
clearSignal: params.clearSignal,
emitCurrentValue: params.includeLastObservation,
});
}

Expand Down
37 changes: 16 additions & 21 deletions src/utils/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,22 @@ class SharedReference<T> {
* @param {Function} cb - Callback to be called each time the reference is
* updated. Takes as first argument its new value and in second argument a
* callback allowing to unregister the callback.
* @param {Object|undefined} [options]
* @param {Object|undefined} [options.clearSignal] - Allows to provide a
* CancellationSignal which will unregister the callback when it emits.
* @param {boolean|undefined} [options.emitCurrentValue] - If `true`, the
* @param {Object} params
* @param {Object} params.clearSignal - Allows to provide a CancellationSignal
* which will unregister the callback when it emits.
* @param {boolean|undefined} [params.emitCurrentValue] - If `true`, the
* callback will also be immediately called with the current value.
*/
public onUpdate(
cb: (val: T, stopListening: () => void) => void,
options?:
| {
clearSignal?: CancellationSignal | undefined;
emitCurrentValue?: boolean | undefined;
}
| undefined,
params: {
clearSignal: CancellationSignal;
emitCurrentValue?: boolean | undefined;
},
): void {
const unlisten = (): void => {
if (options?.clearSignal !== undefined) {
options.clearSignal.deregister(unlisten);
if (params.clearSignal !== undefined) {
params.clearSignal.deregister(unlisten);
}
if (cbObj.hasBeenCleared) {
return;
Expand All @@ -206,18 +204,15 @@ class SharedReference<T> {
const cbObj = { trigger: cb, complete: unlisten, hasBeenCleared: false };
this._listeners.push(cbObj);

if (options?.emitCurrentValue === true) {
if (params.emitCurrentValue === true) {
cb(this._value, unlisten);
}

if (this._isFinished || cbObj.hasBeenCleared) {
unlisten();
return;
}
if (options?.clearSignal === undefined) {
return;
}
options.clearSignal.register(unlisten);
params.clearSignal.register(unlisten);
}

/**
Expand All @@ -240,13 +235,13 @@ class SharedReference<T> {
* ```
* @param {Function} cb - Callback to be called each time the reference is
* updated. Takes the new value in argument.
* @param {Object | undefined} [options]
* @param {Object | undefined} [options.clearSignal] - Allows to provide a
* @param {Object} params
* @param {Object} params.clearSignal - Allows to provide a
* CancellationSignal which will unregister the callback when it emits.
*/
public waitUntilDefined(
cb: (val: Exclude<T, undefined>) => void,
options?: { clearSignal?: CancellationSignal | undefined } | undefined,
params: { clearSignal: CancellationSignal },
): void {
this.onUpdate(
(val: T, stopListening) => {
Expand All @@ -255,7 +250,7 @@ class SharedReference<T> {
cb(this._value as Exclude<T, undefined>);
}
},
{ clearSignal: options?.clearSignal, emitCurrentValue: true },
{ clearSignal: params.clearSignal, emitCurrentValue: true },
);
}

Expand Down

0 comments on commit 0a66b5c

Please sign in to comment.