From 06ac38ffe0c8253b8c6239c1dc6057ef91833c90 Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Tue, 22 Mar 2022 11:43:46 +0100 Subject: [PATCH 1/2] Remove RxJS from the SegmentBuffer code and related code --- src/compat/on_height_width_change.ts | 97 ++++++----- src/core/segment_buffers/garbage_collector.ts | 102 ++++++----- .../audio_video/audio_video_segment_buffer.ts | 162 ++++++++++-------- .../image/image_segment_buffer.ts | 79 ++++----- .../text/html/html_text_segment_buffer.ts | 89 +++++----- .../text/native/native_text_segment_buffer.ts | 81 ++++----- .../segment_buffers/implementations/types.ts | 31 +++- .../segment_buffers/segment_buffers_store.ts | 37 ++-- .../orchestrator/stream_orchestrator.ts | 43 +++-- src/core/stream/period/period_stream.ts | 49 ++++-- .../append_segment_to_buffer.ts | 69 +++----- .../force_garbage_collection.ts | 60 +++---- .../representation/push_init_segment.ts | 18 +- .../representation/push_media_segment.ts | 18 +- .../representation/representation_stream.ts | 16 +- .../get_initialized_source_buffer.ts | 9 +- .../tools/VideoThumbnailLoader/push_data.ts | 8 +- .../remove_buffer_around_time.ts | 12 +- src/utils/task_canceller.ts | 3 +- 19 files changed, 528 insertions(+), 455 deletions(-) diff --git a/src/compat/on_height_width_change.ts b/src/compat/on_height_width_change.ts index 587f489b1f..b4d8435a62 100644 --- a/src/compat/on_height_width_change.ts +++ b/src/compat/on_height_width_change.ts @@ -14,16 +14,11 @@ * limitations under the License. */ -import { - defer as observableDefer, - distinctUntilChanged, - interval as observableInterval, - map, - Observable, - Observer, - startWith, -} from "rxjs"; import log from "../log"; +import createSharedReference, { + IReadOnlySharedReference, +} from "../utils/reference"; +import { CancellationSignal } from "../utils/task_canceller"; import isNode from "./is_node"; export interface IResolution { width : number; @@ -63,8 +58,8 @@ const _ResizeObserver : IResizeObserverConstructor | /* eslint-enable @typescript-eslint/no-unsafe-assignment */ /** - * Emit the current height and width of the given `element` on subscribtion - * and each time it changes. + * Emit the current height and width of the given `element` each time it + * changes. * * On some browsers, we might not be able to rely on a native API to know when * it changes, the `interval` argument allow us to provide us an inverval in @@ -75,46 +70,50 @@ const _ResizeObserver : IResizeObserverConstructor | */ export default function onHeightWidthChange( element : HTMLElement, - interval : number -) : Observable { - return observableDefer(() : Observable => { - if (_ResizeObserver !== undefined) { - let lastHeight : number = -1; - let lastWidth : number = -1; - - return new Observable((obs : Observer) => { - const resizeObserver = new _ResizeObserver(entries => { - if (entries.length === 0) { - log.error("Compat: Resized but no observed element."); - return; - } + interval : number, + cancellationSignal : CancellationSignal +) : IReadOnlySharedReference { + const { height: initHeight, width: initWidth } = element.getBoundingClientRect(); + const ref = createSharedReference({ + height: initHeight, + width: initWidth, + }); + let lastHeight : number = initHeight; + let lastWidth : number = initWidth; - const entry = entries[0]; - const { height, width } = entry.contentRect; + if (_ResizeObserver !== undefined) { + const resizeObserver = new _ResizeObserver(entries => { + if (entries.length === 0) { + log.error("Compat: Resized but no observed element."); + return; + } - if (height !== lastHeight || width !== lastWidth) { - lastHeight = height; - lastWidth = width; - obs.next({ height, width }); - } - }); + const entry = entries[0]; + const { height, width } = entry.contentRect; - resizeObserver.observe(element); - return () => { - resizeObserver.disconnect(); - }; - }); - } + if (height !== lastHeight || width !== lastWidth) { + lastHeight = height; + lastWidth = width; + ref.setValue({ height, width }); + } + }); - return observableInterval(interval).pipe( - startWith(null), - map(() : IResolution => { - const { height, width } = element.getBoundingClientRect(); - return { height, width }; - }), - distinctUntilChanged((o, n) => { - return o.height === n.height && o.width === n.width; - }) - ); - }); + resizeObserver.observe(element); + cancellationSignal.register(() => { + resizeObserver.disconnect(); + }); + } else { + const intervalId = setInterval(() => { + const { height, width } = element.getBoundingClientRect(); + if (height !== lastHeight || width !== lastWidth) { + lastHeight = height; + lastWidth = width; + ref.setValue({ height, width }); + } + }, interval); + cancellationSignal.register(() => { + clearInterval(intervalId); + }); + } + return ref; } diff --git a/src/core/segment_buffers/garbage_collector.ts b/src/core/segment_buffers/garbage_collector.ts index b7a83a3b95..f16b399d0e 100644 --- a/src/core/segment_buffers/garbage_collector.ts +++ b/src/core/segment_buffers/garbage_collector.ts @@ -14,52 +14,66 @@ * limitations under the License. */ -import { - combineLatest as observableCombineLatest, - concatAll, - EMPTY, - from as observableFrom, - ignoreElements, - mergeMap, - Observable, - of as observableOf, -} from "rxjs"; import log from "../../log"; import { getInnerAndOuterTimeRanges } from "../../utils/ranges"; +import { IReadOnlySharedReference } from "../../utils/reference"; +import { CancellationSignal } from "../../utils/task_canceller"; +import { IReadOnlyPlaybackObserver } from "../api"; +import { IStreamOrchestratorPlaybackObservation } from "../stream"; import { SegmentBuffer } from "./implementations"; export interface IGarbageCollectorArgument { /** SegmentBuffer implementation */ segmentBuffer : SegmentBuffer; /** Emit current position in seconds regularly */ - currentTime$ : Observable; + playbackObserver : IReadOnlyPlaybackObserver< + Pick + >; /** Maximum time to keep behind current time position, in seconds */ - maxBufferBehind$ : Observable; + maxBufferBehind : IReadOnlySharedReference; /** Minimum time to keep behind current time position, in seconds */ - maxBufferAhead$ : Observable; + maxBufferAhead : IReadOnlySharedReference; } /** * Perform cleaning of the buffer according to the values set by the user - * each time `currentTime$` emits and each times the + * each time `playbackObserver` emits and each times the * maxBufferBehind/maxBufferAhead values change. * + * Abort this operation when the `cancellationSignal` emits. + * * @param {Object} opt + * @param {Object} cancellationSignal * @returns {Observable} */ -export default function BufferGarbageCollector({ - segmentBuffer, - currentTime$, - maxBufferBehind$, - maxBufferAhead$, -} : IGarbageCollectorArgument) : Observable { - return observableCombineLatest([currentTime$, maxBufferBehind$, maxBufferAhead$]).pipe( - mergeMap(([currentTime, maxBufferBehind, maxBufferAhead]) => { - return clearBuffer(segmentBuffer, - currentTime, - maxBufferBehind, - maxBufferAhead); - })); +export default function BufferGarbageCollector( + { segmentBuffer, + playbackObserver, + maxBufferBehind, + maxBufferAhead } : IGarbageCollectorArgument, + cancellationSignal : CancellationSignal +) : void { + let lastPosition : number; + playbackObserver.listen((o) => { + lastPosition = o.position.pending ?? o.position.last; + clean(); + }, { includeLastObservation: true, clearSignal: cancellationSignal }); + function clean() { + clearBuffer(segmentBuffer, + lastPosition, + maxBufferBehind.getValue(), + maxBufferAhead.getValue(), + cancellationSignal) + .catch(e => { + const errMsg = e instanceof Error ? + e.message : + "Unknown error"; + log.error("Could not run BufferGarbageCollector:", errMsg); + }); + } + maxBufferBehind.onUpdate(clean, { clearSignal: cancellationSignal }); + maxBufferAhead.onUpdate(clean, { clearSignal: cancellationSignal }); + clean(); } /** @@ -76,16 +90,17 @@ export default function BufferGarbageCollector({ * @param {Number} position - The current position * @param {Number} maxBufferBehind * @param {Number} maxBufferAhead - * @returns {Observable} + * @returns {Promise} */ -function clearBuffer( +async function clearBuffer( segmentBuffer : SegmentBuffer, position : number, maxBufferBehind : number, - maxBufferAhead : number -) : Observable { + maxBufferAhead : number, + cancellationSignal : CancellationSignal +) : Promise { if (!isFinite(maxBufferBehind) && !isFinite(maxBufferAhead)) { - return EMPTY; + return Promise.resolve(); } const cleanedupRanges : Array<{ start : number; @@ -150,21 +165,14 @@ function clearBuffer( collectBufferBehind(); collectBufferAhead(); - const clean$ = observableFrom( - cleanedupRanges.map((range) => { + + for (const range of cleanedupRanges) { + if (range.start < range.end) { log.debug("GC: cleaning range from SegmentBuffer", range.start, range.end); - if (range.start >= range.end) { - return observableOf(null); + if (cancellationSignal.cancellationError !== null) { + throw cancellationSignal.cancellationError; } - return segmentBuffer.removeBuffer(range.start, range.end); - }) - ).pipe(concatAll(), - // NOTE As of now (RxJS 7.4.0), RxJS defines `ignoreElements` default - // first type parameter as `any` instead of the perfectly fine `unknown`, - // leading to linter issues, as it forbids the usage of `any`. - // This is why we're disabling the eslint rule. - /* eslint-disable-next-line @typescript-eslint/no-unsafe-argument */ - ignoreElements()); - - return clean$; + await segmentBuffer.removeBuffer(range.start, range.end, cancellationSignal); + } + } } diff --git a/src/core/segment_buffers/implementations/audio_video/audio_video_segment_buffer.ts b/src/core/segment_buffers/implementations/audio_video/audio_video_segment_buffer.ts index 0e72b11895..33df9249cd 100644 --- a/src/core/segment_buffers/implementations/audio_video/audio_video_segment_buffer.ts +++ b/src/core/segment_buffers/implementations/audio_video/audio_video_segment_buffer.ts @@ -14,15 +14,6 @@ * limitations under the License. */ -import { - fromEvent, - interval, - Observable, - Observer, - Subject, - takeUntil, - tap, -} from "rxjs"; import { ICompatSourceBuffer, tryToChangeSourceBufferType, @@ -34,7 +25,12 @@ import areArraysOfNumbersEqual from "../../../../utils/are_arrays_of_numbers_equ import assertUnreachable from "../../../../utils/assert_unreachable"; import { toUint8Array } from "../../../../utils/byte_parsing"; import hashBuffer from "../../../../utils/hash_buffer"; +import noop from "../../../../utils/noop"; import objectAssign from "../../../../utils/object_assign"; +import TaskCanceller, { + CancellationError, + CancellationSignal, +} from "../../../../utils/task_canceller"; import { IInsertedChunkInfos } from "../../inventory"; import { IEndOfSegmentInfos, @@ -57,7 +53,10 @@ import { * AudioVideoSegmentBuffer to emit an event when the corresponding queued * operation is completely processed. */ -type IAVSBQueueItem = ISBOperation & { subject: Subject }; +type IAVSBQueueItem = ISBOperation & { + resolve : () => void; + reject : (err : Error) => void; +}; /** * Task currently processed by the AudioVideoSegmentBuffer. @@ -70,8 +69,14 @@ type IAVSBQueueItem = ISBOperation & { subject: Subject }; * segment before the wanted media segment. */ type IAVSBPendingTask = IPushTask | - IRemoveOperation & { subject: Subject } | - IEndOfSegmentOperation & { subject: Subject }; + IRemoveOperation & { + resolve : () => void; + reject : (err : Error) => void; + } | + IEndOfSegmentOperation & { + resolve : () => void; + reject : (err : Error) => void; + }; /** Structure of a `IAVSBPendingTask` item corresponding to a "Push" operation. */ type IPushTask = IPushOperation & { @@ -88,8 +93,10 @@ type IPushTask = IPushOperation & { */ inventoryData : IInsertedChunkInfos | null; - /** Subject used to emit an event to the caller when the operation is finished. */ - subject : Subject; + /** Callback to call when the push operation succeed. */ + resolve : () => void; + /** Callback to call when the push operation fails. */ + reject : (err : Error) => void; }; /** @@ -110,10 +117,9 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { private readonly _sourceBuffer : ICompatSourceBuffer; /** - * Subject triggered when this AudioVideoSegmentBuffer is disposed. - * Helps to clean-up Observables created at its creation. + * Helps to clean-up resource taken at the AudioVideoSegmentBuffer creation. */ - private _destroy$ : Subject; + private _canceller : TaskCanceller; /** * Queue of awaited buffer "operations". @@ -152,7 +158,7 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { * @constructor * @param {string} bufferType * @param {string} codec - * @param {SourceBuffer} sourceBuffer + * @param {MediaSource} mediaSource */ constructor( bufferType : "audio" | "video", @@ -162,7 +168,7 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { super(); const sourceBuffer = mediaSource.addSourceBuffer(codec); - this._destroy$ = new Subject(); + this._canceller = new TaskCanceller(); this.bufferType = bufferType; this._mediaSource = mediaSource; this._sourceBuffer = sourceBuffer; @@ -171,7 +177,8 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { this._lastInitSegment = null; this.codec = codec; - const { SOURCE_BUFFER_FLUSHING_INTERVAL } = config.getCurrent(); + const onError = this._onPendingTaskError.bind(this); + const reCheck = this._flush.bind(this); // Some browsers (happened with firefox 66) sometimes "forget" to send us // `update` or `updateend` events. @@ -179,19 +186,16 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { // stay locked in a waiting state. // This interval is here to check at regular intervals if the underlying // SourceBuffer is currently updating. - interval(SOURCE_BUFFER_FLUSHING_INTERVAL).pipe( - tap(() => this._flush()), - takeUntil(this._destroy$) - ).subscribe(); - - fromEvent(this._sourceBuffer, "error").pipe( - tap((err) => this._onPendingTaskError(err)), - takeUntil(this._destroy$) - ).subscribe(); - fromEvent(this._sourceBuffer, "updateend").pipe( - tap(() => this._flush()), - takeUntil(this._destroy$) - ).subscribe(); + const { SOURCE_BUFFER_FLUSHING_INTERVAL } = config.getCurrent(); + const intervalId = setInterval(reCheck, SOURCE_BUFFER_FLUSHING_INTERVAL); + this._sourceBuffer.addEventListener("error", onError); + this._sourceBuffer.addEventListener("updateend", reCheck); + + this._canceller.signal.register(() => { + clearInterval(intervalId); + this._sourceBuffer.removeEventListener("error", onError); + this._sourceBuffer.removeEventListener("updateend", reCheck); + }); } /** @@ -218,47 +222,62 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { * `data.chunk` argument to null. * * @param {Object} infos - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ - public pushChunk(infos : IPushChunkInfos) : Observable { + public pushChunk( + infos : IPushChunkInfos, + cancellationSignal : CancellationSignal + ) : Promise { assertPushedDataIsBufferSource(infos); log.debug("AVSB: receiving order to push data to the SourceBuffer", this.bufferType, getLoggableSegmentId(infos.inventoryInfos)); return this._addToQueue({ type: SegmentBufferOperation.Push, - value: infos }); + value: infos }, + cancellationSignal); } /** * Remove buffered data (added to the same FIFO queue than `pushChunk`). * @param {number} start - start position, in seconds * @param {number} end - end position, in seconds - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ - public removeBuffer(start : number, end : number) : Observable { + public removeBuffer( + start : number, + end : number, + cancellationSignal : CancellationSignal + ) : Promise { log.debug("AVSB: receiving order to remove data from the SourceBuffer", this.bufferType, start, end); return this._addToQueue({ type: SegmentBufferOperation.Remove, - value: { start, end } }); + value: { start, end } }, + cancellationSignal); } /** * Indicate that every chunks from a Segment has been given to pushChunk so * far. * This will update our internal Segment inventory accordingly. - * The returned Observable will emit and complete successively once the whole - * segment has been pushed and this indication is acknowledged. + * The returned Promise will resolve once the whole segment has been pushed + * and this indication is acknowledged. * @param {Object} infos - * @returns {Observable} + * @returns {Promise} */ - public endOfSegment(infos : IEndOfSegmentInfos) : Observable { + public endOfSegment( + infos : IEndOfSegmentInfos, + cancellationSignal : CancellationSignal + ) : Promise { log.debug("AVSB: receiving order for validating end of segment", this.bufferType, getLoggableSegmentId(infos)); return this._addToQueue({ type: SegmentBufferOperation.EndOfSegment, - value: infos }); + value: infos }, + cancellationSignal); } /** @@ -302,18 +321,17 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { * @private */ public dispose() : void { - this._destroy$.next(); - this._destroy$.complete(); + this._canceller.cancel(); if (this._pendingTask !== null) { - this._pendingTask.subject.complete(); + this._pendingTask.reject(new CancellationError()); this._pendingTask = null; } while (this._queue.length > 0) { const nextElement = this._queue.shift(); if (nextElement !== undefined) { - nextElement.subject.complete(); + nextElement.reject(new CancellationError()); } } @@ -329,7 +347,7 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { /** * Called when an error arised that made the current task fail. - * @param {Event} error + * @param {Event} err */ private _onPendingTaskError(err : unknown) : void { this._lastInitSegment = null; // initialize init segment as a security @@ -338,7 +356,7 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { err : new Error("An unknown error occured when doing operations " + "on the SourceBuffer"); - this._pendingTask.subject.error(error); + this._pendingTask.reject(error); } } @@ -350,24 +368,22 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { * Cancel queued operation on unsubscription. * @private * @param {Object} operation - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ - private _addToQueue(operation : ISBOperation) : Observable { - return new Observable((obs : Observer) => { + private _addToQueue( + operation : ISBOperation, + cancellationSignal : CancellationSignal + ) : Promise { + return new Promise((resolve, reject) => { + if (cancellationSignal.cancellationError !== null) { + return reject(cancellationSignal.cancellationError); + } const shouldRestartQueue = this._queue.length === 0 && this._pendingTask === null; - const subject = new Subject(); - const queueItem = objectAssign({ subject }, operation); + const queueItem = objectAssign({ resolve, reject }, operation); this._queue.push(queueItem); - - const subscription = subject.subscribe(obs); - if (shouldRestartQueue) { - this._flush(); - } - - return () => { - subscription.unsubscribe(); - + cancellationSignal.register((error : CancellationError) => { // Remove the corresponding element from the AudioVideoSegmentBuffer's // queue. // If the operation was a pending task, it should still continue to not @@ -376,7 +392,14 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { if (index >= 0) { this._queue.splice(index, 1); } - }; + queueItem.resolve = noop; + queueItem.reject = noop; + reject(error); + }); + + if (shouldRestartQueue) { + this._flush(); + } }); } @@ -409,10 +432,9 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { assertUnreachable(task); } - const { subject } = task; + const { resolve } = task; this._pendingTask = null; - subject.next(); - subject.complete(); + resolve(); this._flush(); // Go to next item in queue return; } @@ -436,7 +458,7 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { e : new Error("An unknown error occured when preparing a push operation"); this._lastInitSegment = null; // initialize init segment as a security - nextItem.subject.error(error); + nextItem.reject(error); return; } @@ -492,7 +514,7 @@ export default class AudioVideoSegmentBuffer extends SegmentBuffer { * * This method allows to "prepare" that push operation so that all is left is * to push the returned segment data one after the other (from first to last). - * @param {Object} item + * @param {Object} data * @returns {Object} */ private _preparePushOperation( diff --git a/src/core/segment_buffers/implementations/image/image_segment_buffer.ts b/src/core/segment_buffers/implementations/image/image_segment_buffer.ts index 2c29cdfa0a..e54ecd28de 100644 --- a/src/core/segment_buffers/implementations/image/image_segment_buffer.ts +++ b/src/core/segment_buffers/implementations/image/image_segment_buffer.ts @@ -14,11 +14,6 @@ * limitations under the License. */ -import { - defer as observableDefer, - Observable, - of as observableOf, -} from "rxjs"; import log from "../../../../log"; import { IBifThumbnail } from "../../../../parsers/images/bif"; import { @@ -45,55 +40,57 @@ export default class ImageSegmentBuffer extends SegmentBuffer { /** * @param {Object} data + * @returns {Promise} */ public pushChunk( infos : IPushChunkInfos - ) : Observable { - return observableDefer(() => { - log.debug("ISB: appending new data."); - if (infos.data.chunk === null) { - return observableOf(undefined); - } - const { appendWindow, - chunk } = infos.data; + ) : Promise { + log.debug("ISB: appending new data."); + if (infos.data.chunk === null) { + return Promise.resolve(); + } + const { appendWindow, + chunk } = infos.data; - // The following check is ugly. I don't care, the image buffer is there - // due to an ugly deprecated API that will soon disappear - const { start, end, timescale } = chunk as IImageTrackSegmentData; - const appendWindowStart = appendWindow[0] ?? 0; - const appendWindowEnd = appendWindow[1] ?? Infinity; + // The following check is ugly. I don't care, the image buffer is there + // due to an ugly deprecated API that will soon disappear + const { start, end, timescale } = chunk as IImageTrackSegmentData; + const appendWindowStart = appendWindow[0] ?? 0; + const appendWindowEnd = appendWindow[1] ?? Infinity; - const timescaledStart = start / timescale; - const timescaledEnd = end / timescale; + const timescaledStart = start / timescale; + const timescaledEnd = end / timescale; - const startTime = Math.max(appendWindowStart, timescaledStart); - const endTime = Math.min(appendWindowEnd, timescaledEnd); + const startTime = Math.max(appendWindowStart, timescaledStart); + const endTime = Math.min(appendWindowEnd, timescaledEnd); + try { this._buffered.insert(startTime, endTime); if (infos.inventoryInfos !== null) { this._segmentInventory.insertChunk(infos.inventoryInfos); } - return observableOf(undefined); - }); + } catch (err) { + return Promise.reject(err); + } + return Promise.resolve(); } /** * @param {Number} from * @param {Number} to + * @returns {Promise} */ - public removeBuffer(start : number, end : number) : Observable { - return observableDefer(() => { - log.info("ISB: ignored image data remove order", start, end); + public removeBuffer(start : number, end : number) : Promise { + log.info("ISB: ignored image data remove order", start, end); - // Logic removed as it caused more problems than it resolved: - // Image thumbnails are always downloaded as a single BIF file, meaning that - // any removing might necessitate to re-load the whole file in the future - // which seems pointless. - // In any case, image handling through the regular RxPlayer APIs has been - // completely deprecated now for several reasons, and should disappear in - // the next major version. - return observableOf(undefined); - }); + // Logic removed as it caused more problems than it resolved: + // Image thumbnails are always downloaded as a single BIF file, meaning that + // any removing might necessitate to re-load the whole file in the future + // which seems pointless. + // In any case, image handling through the regular RxPlayer APIs has been + // completely deprecated now for several reasons, and should disappear in + // the next major version. + return Promise.resolve(); } /** @@ -103,13 +100,11 @@ export default class ImageSegmentBuffer extends SegmentBuffer { * The returned Observable will emit and complete successively once the whole * segment has been pushed and this indication is acknowledged. * @param {Object} infos - * @returns {Observable} + * @returns {Promise} */ - public endOfSegment(_infos : IEndOfSegmentInfos) : Observable { - return observableDefer(() => { - this._segmentInventory.completeSegment(_infos, this._buffered); - return observableOf(undefined); - }); + public endOfSegment(_infos : IEndOfSegmentInfos) : Promise { + this._segmentInventory.completeSegment(_infos, this._buffered); + return Promise.resolve(); } /** diff --git a/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts b/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts index 9e5decc983..5568b9cd4a 100644 --- a/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts +++ b/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts @@ -16,14 +16,12 @@ import { concat as observableConcat, - defer as observableDefer, interval as observableInterval, map, merge as observableMerge, Observable, of as observableOf, startWith, - Subject, switchMap, takeUntil, } from "rxjs"; @@ -34,6 +32,7 @@ import { import config from "../../../../../config"; import log from "../../../../../log"; import { ITextTrackSegmentData } from "../../../../../transports"; +import TaskCanceller from "../../../../../utils/task_canceller"; import { IEndOfSegmentInfos, IPushChunkInfos, @@ -116,12 +115,8 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { */ private readonly _videoElement : HTMLMediaElement; - /** - * When "nexting" that subject, every Observable declared here will be - * unsubscribed. - * Used for clean-up - */ - private readonly _destroy$ : Subject; + /** Used for clean-up. */ + private readonly _canceller : TaskCanceller; /** HTMLElement which will contain the cues */ private readonly _textTrackElement : HTMLElement; @@ -131,10 +126,10 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { /** * We could need us to automatically update styling depending on - * `_textTrackElement`'s size. This Subject allows to stop that + * `_textTrackElement`'s size. This TaskCanceller allows to stop that * regular check. */ - private _clearSizeUpdates$ : Subject; + private _sizeUpdateCanceller : TaskCanceller; /** Information on cues currently displayed. */ private _currentCues : Array<{ @@ -167,14 +162,13 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { this._videoElement = videoElement; this._textTrackElement = textTrackElement; - this._clearSizeUpdates$ = new Subject(); - this._destroy$ = new Subject(); + this._sizeUpdateCanceller = new TaskCanceller(); + this._canceller = new TaskCanceller(); this._buffer = new TextTrackCuesStore(); this._currentCues = []; // update text tracks - generateRefreshInterval(this._videoElement) - .pipe(takeUntil(this._destroy$)) + const refreshSub = generateRefreshInterval(this._videoElement) .subscribe((shouldDisplay) => { if (!shouldDisplay) { this._disableCurrentCues(); @@ -192,47 +186,43 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { this._displayCues(cues); } }); + this._canceller.signal.register(() => { + refreshSub.unsubscribe(); + }); } /** * Push segment on Subscription. * @param {Object} infos - * @returns {Observable} + * @returns {Promise} */ - public pushChunk(infos : IPushChunkInfos) : Observable { - return observableDefer(() => { + public pushChunk(infos : IPushChunkInfos) : Promise { + try { this.pushChunkSync(infos); - return observableOf(undefined); - }); + } catch (err) { + return Promise.reject(err); + } + return Promise.resolve(); } /** * Remove buffered data. * @param {number} start - start position, in seconds * @param {number} end - end position, in seconds - * @returns {Observable} + * @returns {Promise} */ - public removeBuffer(start : number, end : number) : Observable { - return observableDefer(() => { - this.removeBufferSync(start, end); - return observableOf(undefined); - }); + public removeBuffer(start : number, end : number) : Promise { + this.removeBufferSync(start, end); + return Promise.resolve(); } /** - * Indicate that every chunks from a Segment has been given to pushChunk so - * far. - * This will update our internal Segment inventory accordingly. - * The returned Observable will emit and complete successively once the whole - * segment has been pushed and this indication is acknowledged. * @param {Object} infos - * @returns {Observable} + * @returns {Promise} */ - public endOfSegment(_infos : IEndOfSegmentInfos) : Observable { - return observableDefer(() => { - this._segmentInventory.completeSegment(_infos, this._buffered); - return observableOf(undefined); - }); + public endOfSegment(infos : IEndOfSegmentInfos) : Promise { + this._segmentInventory.completeSegment(infos, this._buffered); + return Promise.resolve(); } /** @@ -248,8 +238,7 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { this._disableCurrentCues(); this._buffer.remove(0, Infinity); this._buffered.remove(0, Infinity); - this._destroy$.next(); - this._destroy$.complete(); + this._canceller.cancel(); } /** @@ -375,7 +364,7 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { * Remove the current cue from being displayed. */ private _disableCurrentCues() : void { - this._clearSizeUpdates$.next(); + this._sizeUpdateCanceller.cancel(); if (this._currentCues.length > 0) { for (let i = 0; i < this._currentCues.length; i++) { safelyRemoveChild(this._textTrackElement, this._currentCues[i].element); @@ -399,7 +388,7 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { // Remove and re-display everything // TODO More intelligent handling - this._clearSizeUpdates$.next(); + this._sizeUpdateCanceller.cancel(); for (let i = 0; i < this._currentCues.length; i++) { safelyRemoveChild(this._textTrackElement, this._currentCues[i].element); } @@ -418,17 +407,19 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { element : HTMLElement; } => cue.resolution !== null); if (proportionalCues.length > 0) { + this._sizeUpdateCanceller = new TaskCanceller({ cancelOn: this._canceller.signal }); const { TEXT_TRACK_SIZE_CHECKS_INTERVAL } = config.getCurrent(); // update propertionally-sized elements periodically - onHeightWidthChange(this._textTrackElement, TEXT_TRACK_SIZE_CHECKS_INTERVAL) - .pipe(takeUntil(this._clearSizeUpdates$), - takeUntil(this._destroy$)) - .subscribe(({ height, width }) => { - for (let i = 0; i < proportionalCues.length; i++) { - const { resolution, element } = proportionalCues[i]; - updateProportionalElements(height, width, resolution, element); - } - }); + const heightWidthRef = onHeightWidthChange(this._textTrackElement, + TEXT_TRACK_SIZE_CHECKS_INTERVAL, + this._sizeUpdateCanceller.signal); + heightWidthRef.onUpdate(({ height, width }) => { + for (let i = 0; i < proportionalCues.length; i++) { + const { resolution, element } = proportionalCues[i]; + updateProportionalElements(height, width, resolution, element); + } + }, { clearSignal: this._sizeUpdateCanceller.signal, + emitCurrentValue: true }); } } } diff --git a/src/core/segment_buffers/implementations/text/native/native_text_segment_buffer.ts b/src/core/segment_buffers/implementations/text/native/native_text_segment_buffer.ts index 8ad7640daf..0bf209d9c8 100644 --- a/src/core/segment_buffers/implementations/text/native/native_text_segment_buffer.ts +++ b/src/core/segment_buffers/implementations/text/native/native_text_segment_buffer.ts @@ -14,11 +14,6 @@ * limitations under the License. */ -import { - defer as observableDefer, - Observable, - of as observableOf, -} from "rxjs"; import { addTextTrack, ICompatTextTrack, @@ -73,26 +68,26 @@ export default class NativeTextSegmentBuffer extends SegmentBuffer { /** * @param {Object} infos - * @returns {Observable} + * @returns {Promise} */ - public pushChunk(infos : IPushChunkInfos) : Observable { - return observableDefer(() => { - log.debug("NTSB: Appending new native text tracks"); - if (infos.data.chunk === null) { - return observableOf(undefined); - } - const { timestampOffset, - appendWindow, - chunk } = infos.data; - assertChunkIsTextTrackSegmentData(chunk); - const { start: startTime, - end: endTime, - data: dataString, - type, - language } = chunk; - const appendWindowStart = appendWindow[0] ?? 0; - const appendWindowEnd = appendWindow[1] ?? Infinity; - + public pushChunk(infos : IPushChunkInfos) : Promise { + log.debug("NTSB: Appending new native text tracks"); + if (infos.data.chunk === null) { + return Promise.resolve(); + } + const { timestampOffset, + appendWindow, + chunk } = infos.data; + assertChunkIsTextTrackSegmentData(chunk); + const { start: startTime, + end: endTime, + data: dataString, + type, + language } = chunk; + const appendWindowStart = appendWindow[0] ?? 0; + const appendWindowEnd = appendWindow[1] ?? Infinity; + + try { const cues = parseTextTrackToCues(type, dataString, timestampOffset, language); if (appendWindowStart !== 0 && appendWindowEnd !== Infinity) { @@ -130,7 +125,7 @@ export default class NativeTextSegmentBuffer extends SegmentBuffer { } else { if (cues.length <= 0) { log.warn("NTSB: Current text tracks have no cues nor start time. Aborting"); - return observableOf(undefined); + return Promise.resolve(); } log.warn("NTSB: No start time given. Guessing from cues."); start = cues[0].startTime; @@ -142,7 +137,7 @@ export default class NativeTextSegmentBuffer extends SegmentBuffer { } else { if (cues.length <= 0) { log.warn("NTSB: Current text tracks have no cues nor end time. Aborting"); - return observableOf(undefined); + return Promise.resolve(); } log.warn("NTSB: No end time given. Guessing from cues."); end = cues[cues.length - 1].endTime; @@ -151,7 +146,7 @@ export default class NativeTextSegmentBuffer extends SegmentBuffer { if (end <= start) { log.warn("NTSB: Invalid text track appended: ", "the start time is inferior or equal to the end time."); - return observableOf(undefined); + return Promise.resolve(); } if (cues.length > 0) { @@ -178,37 +173,31 @@ export default class NativeTextSegmentBuffer extends SegmentBuffer { if (infos.inventoryInfos !== null) { this._segmentInventory.insertChunk(infos.inventoryInfos); } - return observableOf(undefined); - }); + } catch (err) { + return Promise.reject(err); + } + + return Promise.resolve(); } /** * Remove buffered data. * @param {number} start - start position, in seconds * @param {number} end - end position, in seconds - * @returns {Observable} + * @returns {Promise} */ - public removeBuffer(start : number, end : number) : Observable { - return observableDefer(() => { - this._removeData(start, end); - return observableOf(undefined); - }); + public removeBuffer(start : number, end : number) : Promise { + this._removeData(start, end); + return Promise.resolve(); } /** - * Indicate that every chunks from a Segment has been given to pushChunk so - * far. - * This will update our internal Segment inventory accordingly. - * The returned Observable will emit and complete successively once the whole - * segment has been pushed and this indication is acknowledged. * @param {Object} infos - * @returns {Observable} + * @returns {Promise} */ - public endOfSegment(_infos : IEndOfSegmentInfos) : Observable { - return observableDefer(() => { - this._segmentInventory.completeSegment(_infos, this._buffered); - return observableOf(undefined); - }); + public endOfSegment(_infos : IEndOfSegmentInfos) : Promise { + this._segmentInventory.completeSegment(_infos, this._buffered); + return Promise.resolve(); } /** diff --git a/src/core/segment_buffers/implementations/types.ts b/src/core/segment_buffers/implementations/types.ts index 329b8fac31..141ad4ab8d 100644 --- a/src/core/segment_buffers/implementations/types.ts +++ b/src/core/segment_buffers/implementations/types.ts @@ -14,13 +14,13 @@ * limitations under the License. */ -import { Observable } from "rxjs"; import { Adaptation, ISegment, Period, Representation, } from "../../../manifest"; +import { CancellationSignal } from "../../../utils/task_canceller"; import SegmentInventory, { IBufferedChunk, IBufferedHistoryEntry, @@ -112,28 +112,41 @@ export abstract class SegmentBuffer { * `data.chunk` argument to null. * * @param {Object} infos - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ - public abstract pushChunk(infos : IPushChunkInfos) : Observable; + public abstract pushChunk( + infos : IPushChunkInfos, + cancellationSignal : CancellationSignal + ) : Promise; /** * Remove buffered data (added to the same FIFO queue than `pushChunk`). * @param {number} start - start position, in seconds * @param {number} end - end position, in seconds - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ - public abstract removeBuffer(start : number, end : number) : Observable; + public abstract removeBuffer( + start : number, + end : number, + cancellationSignal : CancellationSignal + ) : Promise; /** * Indicate that every chunks from a Segment has been given to pushChunk so * far. * This will update our internal Segment inventory accordingly. - * The returned Observable will emit and complete successively once the whole - * segment has been pushed and this indication is acknowledged. + * The returned Promise will resolve once the whole segment has been pushed + * and this indication is acknowledged. * @param {Object} infos - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ - public abstract endOfSegment(infos : IEndOfSegmentInfos) : Observable; + public abstract endOfSegment( + infos : IEndOfSegmentInfos, + cancellationSignal : CancellationSignal + ) : Promise; /** * Returns the currently buffered data, in a TimeRanges object. diff --git a/src/core/segment_buffers/segment_buffers_store.ts b/src/core/segment_buffers/segment_buffers_store.ts index 4629b24dd3..cc92ecdacf 100644 --- a/src/core/segment_buffers/segment_buffers_store.ts +++ b/src/core/segment_buffers/segment_buffers_store.ts @@ -14,13 +14,13 @@ * limitations under the License. */ -import { - Observable, - of as observableOf, -} from "rxjs"; import { MediaError } from "../../errors"; import features from "../../features"; import log from "../../log"; +import { + CancellationError, + CancellationSignal, +} from "../../utils/task_canceller"; import { AudioVideoSegmentBuffer, IBufferType, @@ -60,7 +60,7 @@ type INativeMediaBufferType = "audio" | "video"; * To be able to use a SegmentBuffer linked to a native media buffer, you * will first need to create it, but also wait until the other one is either * created or explicitely disabled through the `disableSegmentBuffer` method. - * The Observable returned by `waitForUsableBuffers` will emit when + * The Promise returned by `waitForUsableBuffers` will emit when * that is the case. * * @class SegmentBuffersStore @@ -96,7 +96,7 @@ export default class SegmentBuffersStore { /** * Callbacks called after a SourceBuffer is either created or disabled. * Used for example to trigger the `this.waitForUsableBuffers` - * Observable. + * Promise. */ private _onNativeBufferAddedOrDisabled : Array<() => void>; @@ -179,7 +179,7 @@ export default class SegmentBuffersStore { * content need to all be created (by creating SegmentBuffers linked to them) * before any one can be used. * - * This function will return an Observable emitting when any and all native + * This function will return a Promise resolving when any and all native * SourceBuffers can be used. * * From https://w3c.github.io/media-source/#methods @@ -187,18 +187,27 @@ export default class SegmentBuffersStore { * exception if the media element has reached the HAVE_METADATA * readyState. This can occur if the user agent's media engine * does not support adding more tracks during playback. - * @return {Observable} + * @param {Object} cancelWaitSignal + * @return {Promise} */ - public waitForUsableBuffers() : Observable { + public waitForUsableBuffers(cancelWaitSignal : CancellationSignal) : Promise { if (this._areNativeBuffersUsable()) { - return observableOf(undefined); + return Promise.resolve(); } - return new Observable(obs => { - this._onNativeBufferAddedOrDisabled.push(() => { + return new Promise((res, rej) => { + const onAddedOrDisabled = () => { if (this._areNativeBuffersUsable()) { - obs.next(undefined); - obs.complete(); + res(); + } + }; + this._onNativeBufferAddedOrDisabled.push(onAddedOrDisabled); + + cancelWaitSignal.register((error : CancellationError) => { + const indexOf = this._onNativeBufferAddedOrDisabled.indexOf(onAddedOrDisabled); + if (indexOf >= 0) { + this._onNativeBufferAddedOrDisabled.splice(indexOf, 1); } + rej(error); }); }); } diff --git a/src/core/stream/orchestrator/stream_orchestrator.ts b/src/core/stream/orchestrator/stream_orchestrator.ts index 6f0ffa8079..b9141e8153 100644 --- a/src/core/stream/orchestrator/stream_orchestrator.ts +++ b/src/core/stream/orchestrator/stream_orchestrator.ts @@ -44,9 +44,14 @@ import Manifest, { import deferSubscriptions from "../../../utils/defer_subscriptions"; import { fromEvent } from "../../../utils/event_emitter"; import filterMap from "../../../utils/filter_map"; -import { IReadOnlySharedReference } from "../../../utils/reference"; +import { + createMappedReference, + IReadOnlySharedReference, +} from "../../../utils/reference"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; import nextTickObs from "../../../utils/rx-next-tick"; import SortedList from "../../../utils/sorted_list"; +import TaskCanceller from "../../../utils/task_canceller"; import WeakMapMemory from "../../../utils/weak_map_memory"; import { IRepresentationEstimator } from "../../adaptive"; import type { IReadOnlyPlaybackObserver } from "../../api"; @@ -140,14 +145,22 @@ export default function StreamOrchestrator( const defaultMaxAhead = MAXIMUM_MAX_BUFFER_AHEAD[bufferType] != null ? MAXIMUM_MAX_BUFFER_AHEAD[bufferType] as number : Infinity; - return BufferGarbageCollector({ - segmentBuffer, - currentTime$: playbackObserver.getReference().asObservable() - .pipe(map(o => o.position.pending ?? o.position.last)), - maxBufferBehind$: maxBufferBehind.asObservable().pipe( - map(val => Math.min(val, defaultMaxBehind))), - maxBufferAhead$: maxBufferAhead.asObservable().pipe( - map(val => Math.min(val, defaultMaxAhead))), + return new Observable(() => { + const canceller = new TaskCanceller(); + BufferGarbageCollector( + { segmentBuffer, + playbackObserver, + maxBufferBehind: createMappedReference(maxBufferBehind, + (val) => + Math.min(val, defaultMaxBehind), + canceller.signal), + maxBufferAhead: createMappedReference(maxBufferAhead, + (val) => + Math.min(val, defaultMaxAhead), + canceller.signal) }, + canceller.signal + ); + return () => { canceller.cancel(); }; }); }); @@ -322,9 +335,15 @@ export default function StreamOrchestrator( destroyStreams$.next(); return observableConcat( - ...rangesToClean.map(({ start, end }) => - start >= end ? EMPTY : - segmentBuffer.removeBuffer(start, end).pipe(ignoreElements())), + ...rangesToClean.map(({ start, end }) => { + if (start >= end) { + return EMPTY; + } + const canceller = new TaskCanceller(); + return fromCancellablePromise(canceller, () => { + return segmentBuffer.removeBuffer(start, end, canceller.signal); + }).pipe(ignoreElements()); + }), // Schedule micro task before checking the last playback observation // to reduce the risk of race conditions where the next observation diff --git a/src/core/stream/period/period_stream.ts b/src/core/stream/period/period_stream.ts index 6e3b8bd8c6..f4f65e062f 100644 --- a/src/core/stream/period/period_stream.ts +++ b/src/core/stream/period/period_stream.ts @@ -45,7 +45,10 @@ import { getLeftSizeOfRange } from "../../../utils/ranges"; import createSharedReference, { IReadOnlySharedReference, } from "../../../utils/reference"; -import { CancellationSignal } from "../../../utils/task_canceller"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller, { + CancellationSignal, +} from "../../../utils/task_canceller"; import WeakMapMemory from "../../../utils/weak_map_memory"; import { IRepresentationEstimator } from "../../adaptive"; import { IReadOnlyPlaybackObserver } from "../../api"; @@ -176,15 +179,20 @@ export default function PeriodStream({ if (SegmentBuffersStore.isNative(bufferType)) { return reloadAfterSwitch(period, bufferType, playbackObserver, 0); } - if (period.end === undefined) { - cleanBuffer$ = segmentBufferStatus.value.removeBuffer(period.start, - Infinity); - } else if (period.end <= period.start) { - cleanBuffer$ = observableOf(null); - } else { - cleanBuffer$ = segmentBufferStatus.value.removeBuffer(period.start, - period.end); - } + const canceller = new TaskCanceller(); + cleanBuffer$ = fromCancellablePromise(canceller, () => { + if (period.end === undefined) { + return segmentBufferStatus.value.removeBuffer(period.start, + Infinity, + canceller.signal); + } else if (period.end <= period.start) { + return Promise.resolve(); + } else { + return segmentBufferStatus.value.removeBuffer(period.start, + period.end, + canceller.signal); + } + }); } else { if (segmentBufferStatus.type === "uninitialized") { segmentBuffersStore.disableSegmentBuffer(bufferType); @@ -237,8 +245,11 @@ export default function PeriodStream({ const cleanBuffer$ = strategy.type === "clean-buffer" || strategy.type === "flush-buffer" ? - observableConcat(...strategy.value.map(({ start, end }) => - segmentBuffer.removeBuffer(start, end)) + observableConcat(...strategy.value.map(({ start, end }) => { + const canceller = new TaskCanceller(); + return fromCancellablePromise(canceller, () => + segmentBuffer.removeBuffer(start, end, canceller.signal)); + }) // NOTE As of now (RxJS 7.4.0), RxJS defines `ignoreElements` default // first type parameter as `any` instead of the perfectly fine `unknown`, // leading to linter issues, as it forbids the usage of `any`. @@ -249,12 +260,14 @@ export default function PeriodStream({ const bufferGarbageCollector$ = garbageCollectors.get(segmentBuffer); const adaptationStream$ = createAdaptationStream(adaptation, segmentBuffer); - return segmentBuffersStore.waitForUsableBuffers().pipe(mergeMap(() => { - return observableConcat(cleanBuffer$, - needsBufferFlush$, - observableMerge(adaptationStream$, - bufferGarbageCollector$)); - })); + const cancelWait = new TaskCanceller(); + return fromCancellablePromise(cancelWait, () => + segmentBuffersStore.waitForUsableBuffers(cancelWait.signal) + ).pipe(mergeMap(() => + observableConcat(cleanBuffer$, + needsBufferFlush$, + observableMerge(adaptationStream$, + bufferGarbageCollector$)))); }); return observableConcat( diff --git a/src/core/stream/representation/append_segment_to_buffer.ts b/src/core/stream/representation/append_segment_to_buffer.ts index 406ef7a47c..10b2d7b9d8 100644 --- a/src/core/stream/representation/append_segment_to_buffer.ts +++ b/src/core/stream/representation/append_segment_to_buffer.ts @@ -18,15 +18,8 @@ * This file allows any Stream to push data to a SegmentBuffer. */ -import { - catchError, - concat as observableConcat, - mergeMap, - ignoreElements, - Observable, - take, -} from "rxjs"; import { MediaError } from "../../../errors"; +import { CancellationSignal } from "../../../utils/task_canceller"; import { IReadOnlyPlaybackObserver } from "../../api"; import { IPushChunkInfos, @@ -39,45 +32,37 @@ import { IRepresentationStreamPlaybackObservation } from "./representation_strea * Append a segment to the given segmentBuffer. * If it leads to a QuotaExceededError, try to run our custom range * _garbage collector_ then retry. - * * @param {Observable} playbackObserver * @param {Object} segmentBuffer * @param {Object} dataInfos - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ -export default function appendSegmentToBuffer( +export default async function appendSegmentToBuffer( playbackObserver : IReadOnlyPlaybackObserver, segmentBuffer : SegmentBuffer, - dataInfos : IPushChunkInfos -) : Observable { - const append$ = segmentBuffer.pushChunk(dataInfos); - - return append$.pipe( - catchError((appendError : unknown) => { - if (!(appendError instanceof Error) || appendError.name !== "QuotaExceededError") { - const reason = appendError instanceof Error ? - appendError.toString() : - "An unknown error happened when pushing content"; - throw new MediaError("BUFFER_APPEND_ERROR", reason); - } - - return playbackObserver.getReference().asObservable().pipe( - take(1), - mergeMap((observation) => { - const currentPos = observation.position.pending ?? - observation.position.last; - return observableConcat( - forceGarbageCollection(currentPos, segmentBuffer).pipe(ignoreElements()), - append$ - ).pipe( - catchError((forcedGCError : unknown) => { - const reason = forcedGCError instanceof Error ? - forcedGCError.toString() : - "Could not clean the buffer"; + dataInfos : IPushChunkInfos, + cancellationSignal : CancellationSignal +) : Promise { + try { + await segmentBuffer.pushChunk(dataInfos, cancellationSignal); + } catch (appendError : unknown) { + if (!(appendError instanceof Error) || appendError.name !== "QuotaExceededError") { + const reason = appendError instanceof Error ? + appendError.toString() : + "An unknown error happened when pushing content"; + throw new MediaError("BUFFER_APPEND_ERROR", reason); + } + const { position } = playbackObserver.getReference().getValue(); + const currentPos = position.pending ?? position.last; + try { + await forceGarbageCollection(currentPos, segmentBuffer, cancellationSignal); + await segmentBuffer.pushChunk(dataInfos, cancellationSignal); + } catch (err2) { + const reason = err2 instanceof Error ? err2.toString() : + "Could not clean the buffer"; - throw new MediaError("BUFFER_FULL_ERROR", reason); - }) - ); - })); - })); + throw new MediaError("BUFFER_FULL_ERROR", reason); + } + } } diff --git a/src/core/stream/representation/force_garbage_collection.ts b/src/core/stream/representation/force_garbage_collection.ts index 7b83aa4c58..04b1fe727e 100644 --- a/src/core/stream/representation/force_garbage_collection.ts +++ b/src/core/stream/representation/force_garbage_collection.ts @@ -14,16 +14,10 @@ * limitations under the License. */ -import { - concatAll, - defer as observableDefer, - from as observableFrom, - Observable, - of as observableOf, -} from "rxjs"; import config from "../../../config"; import log from "../../../log"; import { getInnerAndOuterTimeRanges } from "../../../utils/ranges"; +import { CancellationSignal } from "../../../utils/task_canceller"; import { SegmentBuffer } from "../../segment_buffers"; @@ -33,37 +27,39 @@ import { SegmentBuffer } from "../../segment_buffers"; * Try to clean up buffered ranges from a low gcGap at first. * If it does not succeed to clean up space, use a higher gcCap. * - * @param {Observable} timings$ + * @param {number} currentPosition * @param {Object} bufferingQueue - * @returns {Observable} + * @param {Object} cancellationSignal + * @returns {Promise} */ -export default function forceGarbageCollection( +export default async function forceGarbageCollection( currentPosition : number, - bufferingQueue : SegmentBuffer -) : Observable { - return observableDefer(() => { - const GC_GAP_CALM = config.getCurrent().BUFFER_GC_GAPS.CALM; - const GC_GAP_BEEFY = config.getCurrent().BUFFER_GC_GAPS.BEEFY; - log.warn("Stream: Running garbage collector"); - const buffered = bufferingQueue.getBufferedRanges(); - let cleanedupRanges = selectGCedRanges(currentPosition, buffered, GC_GAP_CALM); + bufferingQueue : SegmentBuffer, + cancellationSignal : CancellationSignal +) : Promise { + const GC_GAP_CALM = config.getCurrent().BUFFER_GC_GAPS.CALM; + const GC_GAP_BEEFY = config.getCurrent().BUFFER_GC_GAPS.BEEFY; + log.warn("Stream: Running garbage collector"); + const buffered = bufferingQueue.getBufferedRanges(); + let cleanedupRanges = selectGCedRanges(currentPosition, buffered, GC_GAP_CALM); - // more aggressive GC if we could not find any range to clean - if (cleanedupRanges.length === 0) { - cleanedupRanges = selectGCedRanges(currentPosition, buffered, GC_GAP_BEEFY); - } + // more aggressive GC if we could not find any range to clean + if (cleanedupRanges.length === 0) { + cleanedupRanges = selectGCedRanges(currentPosition, buffered, GC_GAP_BEEFY); + } - if (log.hasLevel("DEBUG")) { - log.debug("Stream: GC cleaning", - cleanedupRanges.map(({ start, end }) => `start: ${start} - end ${end}`) - .join(", ")); + if (log.hasLevel("DEBUG")) { + log.debug("Stream: GC cleaning", + cleanedupRanges.map(({ start, end }) => `start: ${start} - end ${end}`) + .join(", ")); + } + for (const range of cleanedupRanges) { + const { start, end } = range; + if (start < end) { + await bufferingQueue.removeBuffer(start, end, cancellationSignal); } - return observableFrom( - cleanedupRanges.map(({ start, end }) => - start >= end ? observableOf(null) : - bufferingQueue.removeBuffer(start, end)) - ).pipe(concatAll()); - }); + } + return; } /** diff --git a/src/core/stream/representation/push_init_segment.ts b/src/core/stream/representation/push_init_segment.ts index a7248ed10f..704a905534 100644 --- a/src/core/stream/representation/push_init_segment.ts +++ b/src/core/stream/representation/push_init_segment.ts @@ -26,6 +26,8 @@ import Manifest, { Period, Representation, } from "../../../manifest"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller from "../../../utils/task_canceller"; import { IReadOnlyPlaybackObserver } from "../../api"; import { IPushedChunkData, @@ -71,11 +73,15 @@ export default function pushInitSegment( timestampOffset: 0, appendWindow: [ undefined, undefined ], codec }; - return appendSegmentToBuffer(playbackObserver, - segmentBuffer, - { data, inventoryInfos: null }).pipe(map(() => { - const buffered = segmentBuffer.getBufferedRanges(); - return EVENTS.addedSegment(content, segment, buffered, segmentData); - })); + const canceller = new TaskCanceller(); + return fromCancellablePromise(canceller, () => + appendSegmentToBuffer(playbackObserver, + segmentBuffer, + { data, inventoryInfos: null }, + canceller.signal)) + .pipe(map(() => { + const buffered = segmentBuffer.getBufferedRanges(); + return EVENTS.addedSegment(content, segment, buffered, segmentData); + })); }); } diff --git a/src/core/stream/representation/push_media_segment.ts b/src/core/stream/representation/push_media_segment.ts index ef59020c49..a3c6a4c83b 100644 --- a/src/core/stream/representation/push_media_segment.ts +++ b/src/core/stream/representation/push_media_segment.ts @@ -29,6 +29,8 @@ import Manifest, { } from "../../../manifest"; import { ISegmentParserParsedMediaChunk } from "../../../transports"; import objectAssign from "../../../utils/object_assign"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller from "../../../utils/task_canceller"; import { IReadOnlyPlaybackObserver } from "../../api"; import { SegmentBuffer } from "../../segment_buffers"; import EVENTS from "../events_generators"; @@ -108,12 +110,16 @@ export default function pushMediaSegment( start: estimatedStart, end: estimatedEnd }, content); + const canceller = new TaskCanceller(); - return appendSegmentToBuffer(playbackObserver, - segmentBuffer, - { data, inventoryInfos }).pipe(map(() => { - const buffered = segmentBuffer.getBufferedRanges(); - return EVENTS.addedSegment(content, segment, buffered, chunkData); - })); + return fromCancellablePromise(canceller, () => + appendSegmentToBuffer(playbackObserver, + segmentBuffer, + { data, inventoryInfos }, + canceller.signal)) + .pipe(map(() => { + const buffered = segmentBuffer.getBufferedRanges(); + return EVENTS.addedSegment(content, segment, buffered, chunkData); + })); }); } diff --git a/src/core/stream/representation/representation_stream.ts b/src/core/stream/representation/representation_stream.ts index 3c0308c9c3..998e63569e 100644 --- a/src/core/stream/representation/representation_stream.ts +++ b/src/core/stream/representation/representation_stream.ts @@ -52,6 +52,8 @@ import Manifest, { import assertUnreachable from "../../../utils/assert_unreachable"; import objectAssign from "../../../utils/object_assign"; import { createSharedReference } from "../../../utils/reference"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller from "../../../utils/task_canceller"; import { IReadOnlyPlaybackObserver } from "../../api"; import { IPrioritizedSegmentFetcher } from "../../fetchers"; import { SegmentBuffer } from "../../segment_buffers"; @@ -267,10 +269,11 @@ export default function RepresentationStream({ 0, initialWantedTime - UPTO_CURRENT_POSITION_CLEANUP); if (gcedPosition > 0) { - bufferRemoval = segmentBuffer - .removeBuffer(0, gcedPosition) - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - .pipe(ignoreElements()); + const removalCanceller = new TaskCanceller(); + bufferRemoval = fromCancellablePromise(removalCanceller, () => + segmentBuffer.removeBuffer(0, gcedPosition, removalCanceller.signal) + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + ).pipe(ignoreElements()); } } return status.shouldRefreshManifest ? @@ -318,7 +321,10 @@ export default function RepresentationStream({ case "end-of-segment": { const { segment } = evt.value; - return segmentBuffer.endOfSegment(objectAssign({ segment }, content)) + const endOfSegmentCanceller = new TaskCanceller(); + return fromCancellablePromise(endOfSegmentCanceller, () => + segmentBuffer.endOfSegment(objectAssign({ segment }, content), + endOfSegmentCanceller.signal)) // NOTE As of now (RxJS 7.4.0), RxJS defines `ignoreElements` default // first type parameter as `any` instead of the perfectly fine `unknown`, // leading to linter issues, as it forbids the usage of `any`. diff --git a/src/experimental/tools/VideoThumbnailLoader/get_initialized_source_buffer.ts b/src/experimental/tools/VideoThumbnailLoader/get_initialized_source_buffer.ts index 14150efb7f..0bf347c8ee 100644 --- a/src/experimental/tools/VideoThumbnailLoader/get_initialized_source_buffer.ts +++ b/src/experimental/tools/VideoThumbnailLoader/get_initialized_source_buffer.ts @@ -29,6 +29,8 @@ import { import { ISegmentFetcher } from "../../../core/fetchers/segment/segment_fetcher"; import { AudioVideoSegmentBuffer } from "../../../core/segment_buffers/implementations"; import { ISegment } from "../../../manifest"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller from "../../../utils/task_canceller"; import prepareSourceBuffer from "./prepare_source_buffer"; import { IContentInfos } from "./types"; @@ -88,14 +90,17 @@ function loadAndPushInitData(contentInfos: IContentInfos, const initSegmentData = initializationData instanceof ArrayBuffer ? new Uint8Array(initializationData) : initializationData; - return sourceBuffer + + const pushCanceller = new TaskCanceller(); + return fromCancellablePromise(pushCanceller, () => sourceBuffer .pushChunk({ data: { initSegment: initSegmentData, chunk: null, appendWindow: [undefined, undefined], timestampOffset: 0, codec: contentInfos .representation.getMimeTypeString() }, - inventoryInfos: null }); + inventoryInfos: null }, + pushCanceller.signal)); }) ); } diff --git a/src/experimental/tools/VideoThumbnailLoader/push_data.ts b/src/experimental/tools/VideoThumbnailLoader/push_data.ts index 742b5358e0..cb86a72767 100644 --- a/src/experimental/tools/VideoThumbnailLoader/push_data.ts +++ b/src/experimental/tools/VideoThumbnailLoader/push_data.ts @@ -9,6 +9,8 @@ import Manifest, { Representation, } from "../../../manifest"; import { ISegmentParserParsedMediaChunk } from "../../../transports"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller from "../../../utils/task_canceller"; /** * Push data to the video source buffer. @@ -33,12 +35,14 @@ export default function pushData( const { chunkData, appendWindow } = parsed; const segmentData = chunkData instanceof ArrayBuffer ? new Uint8Array(chunkData) : chunkData; - return videoSourceBuffer + const pushCanceller = new TaskCanceller(); + return fromCancellablePromise(pushCanceller, () => videoSourceBuffer .pushChunk({ data: { chunk: segmentData, timestampOffset: 0, appendWindow, initSegment: null, codec: inventoryInfos .representation.getMimeTypeString() }, - inventoryInfos }); + inventoryInfos }, + pushCanceller.signal)); } diff --git a/src/experimental/tools/VideoThumbnailLoader/remove_buffer_around_time.ts b/src/experimental/tools/VideoThumbnailLoader/remove_buffer_around_time.ts index ee850812aa..9eb099b395 100644 --- a/src/experimental/tools/VideoThumbnailLoader/remove_buffer_around_time.ts +++ b/src/experimental/tools/VideoThumbnailLoader/remove_buffer_around_time.ts @@ -20,6 +20,8 @@ import { of as observableOf, } from "rxjs"; import { AudioVideoSegmentBuffer } from "../../../core/segment_buffers/implementations"; +import fromCancellablePromise from "../../../utils/rx-from_cancellable_promise"; +import TaskCanceller from "../../../utils/task_canceller"; /** * Remove buffer around wanted time, considering a margin around @@ -42,11 +44,17 @@ export default function removeBufferAroundTime$( return observableOf(null); } const bufferRemovals$ = []; + const removalCanceller = new TaskCanceller(); if ((time - margin) > 0) { - bufferRemovals$.push(sourceBuffer.removeBuffer(0, time - margin)); + bufferRemovals$.push( + fromCancellablePromise(removalCanceller, () => + sourceBuffer.removeBuffer(0, time - margin, removalCanceller.signal))); } if ((time + margin) < videoElement.duration) { - bufferRemovals$.push(sourceBuffer.removeBuffer(time + margin, videoElement.duration)); + bufferRemovals$.push(fromCancellablePromise(removalCanceller, () => + sourceBuffer.removeBuffer(time + margin, + videoElement.duration, + removalCanceller.signal))); } return observableCombineLatest(bufferRemovals$); } diff --git a/src/utils/task_canceller.ts b/src/utils/task_canceller.ts index fef4f77ab7..8d40191999 100644 --- a/src/utils/task_canceller.ts +++ b/src/utils/task_canceller.ts @@ -299,8 +299,7 @@ export class CancellationSignal { export type ICancellationListener = (error : CancellationError) => void; /** - * Error created when a task is cancelled through the TaskCanceller. - * + * Error created when a task is cancelled. * @class CancellationError * @extends Error */ From c581e7a3e8954772df463f942e2c62066b8224f5 Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Mon, 13 Jun 2022 15:53:41 +0200 Subject: [PATCH 2/2] Remove RxJS from the HTMLTextSegmentBuffer --- src/compat/event_listeners.ts | 87 ++++++++++++- .../text/html/html_text_segment_buffer.ts | 123 +++++++++--------- 2 files changed, 148 insertions(+), 62 deletions(-) diff --git a/src/compat/event_listeners.ts b/src/compat/event_listeners.ts index ff403d9776..9d3ccfbe12 100644 --- a/src/compat/event_listeners.ts +++ b/src/compat/event_listeners.ts @@ -107,6 +107,69 @@ export type IEventTargetLike = HTMLElement | IEventEmitterLike | IEventEmitter; +/** + * Returns a function allowing to add event listeners for particular event(s) + * optionally automatically adding browser prefixes if needed. + * @param {Array.} eventNames - The event(s) to listen to. If multiple + * events are set, the event listener will be triggered when any of them emits. + * @returns {Function} - Returns function allowing to easily add a callback to + * be triggered when that event is emitted on a given event target. + */ +function createCompatibleEventListener( + eventNames : string[] +) : + ( + element : IEventTargetLike, + listener : (event? : unknown) => void, + cancelSignal: CancellationSignal + ) => void +{ + let mem : string|undefined; + const prefixedEvents = eventPrefixed(eventNames); + + return ( + element : IEventTargetLike, + listener: (event? : unknown) => void, + cancelSignal: CancellationSignal + ) => { + if (cancelSignal.isCancelled) { + return; + } + + // if the element is a HTMLElement we can detect + // the supported event, and memoize it in `mem` + if (element instanceof HTMLElement) { + if (typeof mem === "undefined") { + mem = findSupportedEvent(element, prefixedEvents); + } + + if (isNonEmptyString(mem)) { + element.addEventListener(mem, listener); + cancelSignal.register(() => { + if (mem !== undefined) { + element.removeEventListener(mem, listener); + } + }); + } else { + if (__ENVIRONMENT__.CURRENT_ENV === __ENVIRONMENT__.DEV as number) { + log.warn(`compat: element ${element.tagName}` + + " does not support any of these events: " + + prefixedEvents.join(", ")); + } + return ; + } + } + + prefixedEvents.forEach(eventName => { + (element as IEventEmitterLike).addEventListener(eventName, listener); + cancelSignal.register(() => { + (element as IEventEmitterLike).removeEventListener(eventName, listener); + }); + }); + }; + +} + /** * @param {Array.} eventNames * @param {Array.|undefined} prefixes @@ -247,7 +310,8 @@ export interface IPictureInPictureEvent { /** * Emit when video enters and leaves Picture-In-Picture mode. - * @param {HTMLMediaElement} mediaElement + * @param {HTMLMediaElement} elt + * @param {Object} stopListening * @returns {Observable} */ function getPictureOnPictureStateRef( @@ -505,6 +569,24 @@ const onKeyError$ = compatibleListener(["keyerror", "error"]); */ const onKeyStatusesChange$ = compatibleListener(["keystatuseschange"]); +/** + * @param {HTMLMediaElement} mediaElement + * @returns {Observable} + */ +const onSeeking = createCompatibleEventListener(["seeking"]); + +/** + * @param {HTMLMediaElement} mediaElement + * @returns {Observable} + */ +const onSeeked = createCompatibleEventListener(["seeked"]); + +/** + * @param {HTMLMediaElement} mediaElement + * @returns {Observable} + */ +const onEnded = createCompatibleEventListener(["ended"]); + /** * Utilitary function allowing to add an event listener and remove it * automatically once the given `CancellationSignal` emits. @@ -534,6 +616,7 @@ export { getVideoVisibilityRef, getVideoWidthRef, onEncrypted$, + onEnded, onEnded$, onFullscreenChange$, onKeyAdded$, @@ -542,7 +625,9 @@ export { onKeyStatusesChange$, onLoadedMetadata$, onRemoveSourceBuffers$, + onSeeked, onSeeked$, + onSeeking, onSeeking$, onSourceClose$, onSourceEnded$, diff --git a/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts b/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts index 5568b9cd4a..dfc28dd791 100644 --- a/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts +++ b/src/core/segment_buffers/implementations/text/html/html_text_segment_buffer.ts @@ -14,17 +14,6 @@ * limitations under the License. */ -import { - concat as observableConcat, - interval as observableInterval, - map, - merge as observableMerge, - Observable, - of as observableOf, - startWith, - switchMap, - takeUntil, -} from "rxjs"; import { events, onHeightWidthChange, @@ -32,7 +21,9 @@ import { import config from "../../../../../config"; import log from "../../../../../log"; import { ITextTrackSegmentData } from "../../../../../transports"; -import TaskCanceller from "../../../../../utils/task_canceller"; +import TaskCanceller, { + CancellationSignal, +} from "../../../../../utils/task_canceller"; import { IEndOfSegmentInfos, IPushChunkInfos, @@ -43,31 +34,7 @@ import parseTextTrackToElements from "./parsers"; import TextTrackCuesStore from "./text_track_cues_store"; import updateProportionalElements from "./update_proportional_elements"; -const { onEnded$, - onSeeked$, - onSeeking$ } = events; - - -/** - * Generate the interval at which TextTrack HTML Cues should be refreshed. - * @param {HTMLMediaElement} videoElement - * @returns {Observable} - */ -function generateRefreshInterval(videoElement : HTMLMediaElement) : Observable { - const seeking$ = onSeeking$(videoElement); - const seeked$ = onSeeked$(videoElement); - const ended$ = onEnded$(videoElement); - const { MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL } = config.getCurrent(); - const manualRefresh$ = observableMerge(seeked$, ended$); - const autoRefresh$ = observableInterval(MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL) - .pipe(startWith(null)); - - return manualRefresh$.pipe( - startWith(null), - switchMap(() => observableConcat(autoRefresh$.pipe(map(() => true), - takeUntil(seeking$)), - observableOf(false)))); -} +const { onEnded, onSeeked, onSeeking } = events; /** * @param {Element} element @@ -167,28 +134,7 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { this._buffer = new TextTrackCuesStore(); this._currentCues = []; - // update text tracks - const refreshSub = generateRefreshInterval(this._videoElement) - .subscribe((shouldDisplay) => { - if (!shouldDisplay) { - this._disableCurrentCues(); - return; - } - const { MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL } = config.getCurrent(); - // to spread the time error, we divide the regular chosen interval. - const time = Math.max(this._videoElement.currentTime + - (MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL / 1000) / 2, - 0); - const cues = this._buffer.get(time); - if (cues.length === 0) { - this._disableCurrentCues(); - } else { - this._displayCues(cues); - } - }); - this._canceller.signal.register(() => { - refreshSub.unsubscribe(); - }); + this.autoRefreshSubtitles(this._canceller.signal); } /** @@ -253,7 +199,7 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { * * /!\ This method won't add any data to the linked inventory. * Please use the `pushChunk` method for most use-cases. - * @param {Object} data + * @param {Object} infos * @returns {boolean} */ public pushChunkSync(infos : IPushChunkInfos) : void { @@ -375,7 +321,7 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { /** * Display a new Cue. If one was already present, it will be replaced. - * @param {HTMLElement} element + * @param {HTMLElement} elements */ private _displayCues(elements : HTMLElement[]) : void { const nothingChanged = this._currentCues.length === elements.length && @@ -422,6 +368,61 @@ export default class HTMLTextSegmentBuffer extends SegmentBuffer { emitCurrentValue: true }); } } + + /** + * Auto-refresh the display of subtitles according to the media element's + * position and events. + * @param {Object} cancellationSignal + */ + private autoRefreshSubtitles( + cancellationSignal : CancellationSignal + ) : void { + let autoRefreshCanceller : TaskCanceller | null = null; + const { MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL } = config.getCurrent(); + + const startAutoRefresh = () => { + stopAutoRefresh(); + autoRefreshCanceller = new TaskCanceller({ cancelOn: cancellationSignal }); + const intervalId = setInterval(() => this.refreshSubtitles(), + MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL); + autoRefreshCanceller.signal.register(() => { + clearInterval(intervalId); + }); + this.refreshSubtitles(); + }; + + onSeeking(this._videoElement, () => { + stopAutoRefresh(); + this._disableCurrentCues(); + }, cancellationSignal); + onSeeked(this._videoElement, startAutoRefresh, cancellationSignal); + onEnded(this._videoElement, startAutoRefresh, cancellationSignal); + + function stopAutoRefresh() { + if (autoRefreshCanceller !== null) { + autoRefreshCanceller.cancel(); + autoRefreshCanceller = null; + } + } + } + + /** + * Refresh current subtitles according to the current media element's + * position. + */ + private refreshSubtitles() : void { + const { MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL } = config.getCurrent(); + // to spread the time error, we divide the regular chosen interval. + const time = Math.max(this._videoElement.currentTime + + (MAXIMUM_HTML_TEXT_TRACK_UPDATE_INTERVAL / 1000) / 2, + 0); + const cues = this._buffer.get(time); + if (cues.length === 0) { + this._disableCurrentCues(); + } else { + this._displayCues(cues); + } + } } /** Data of chunks that should be pushed to the NativeTextSegmentBuffer. */