diff --git a/src/core/fetchers/segment/segment_queue.ts b/src/core/fetchers/segment/segment_queue.ts index 75cb79b62e..3764f8d181 100644 --- a/src/core/fetchers/segment/segment_queue.ts +++ b/src/core/fetchers/segment/segment_queue.ts @@ -56,16 +56,32 @@ export default class SegmentQueue extends EventEmitter> */ private _currentContentInfo: ISegmentQueueContentInfo | null; + /** + * Indicates whether the user agent believes it has enough buffered data to ensure + * uninterrupted playback for a meaningful period or needs more data. + * It also reflects whether the user agent can retrieve and buffer data in an + * energy-efficient manner while maintaining the desired memory usage. + * The value can be `undefined` if the user agent does not provide this indicator. + * `true` indicates that the buffer is low, and more data should be buffered. + * `false` indicates that there is enough buffered data, and no additional data needs + * to be buffered at this time. + */ + private canLoad: SharedReference; + /** * Create a new `SegmentQueue`. * * @param {Object} segmentFetcher - Interface to facilitate the download of * segments. */ - constructor(segmentFetcher: IPrioritizedSegmentFetcher) { + constructor( + segmentFetcher: IPrioritizedSegmentFetcher, + canLoad: SharedReference, + ) { super(); this._segmentFetcher = segmentFetcher; this._currentContentInfo = null; + this.canLoad = canLoad; } /** @@ -118,7 +134,6 @@ export default class SegmentQueue extends EventEmitter> public resetForContent( content: ISegmentQueueContext, hasInitSegment: boolean, - canLoad: SharedReference, ): SharedReference { this._currentContentInfo?.currentCanceller.cancel(); const downloadQueue = new SharedReference({ @@ -139,11 +154,10 @@ export default class SegmentQueue extends EventEmitter> initSegmentRequest: null, mediaSegmentRequest: null, mediaSegmentAwaitingInitMetadata: null, - canLoad, }; this._currentContentInfo = currentContentInfo; - this._currentContentInfo.canLoad.onUpdate( + this.canLoad.onUpdate( (val) => { if (val) { log.debug( @@ -274,7 +288,7 @@ export default class SegmentQueue extends EventEmitter> const { downloadQueue, content, initSegmentInfoRef, currentCanceller } = contentInfo; const recursivelyRequestSegments = (): void => { - if (!contentInfo.canLoad.getValue()) { + if (!this.canLoad.getValue()) { log.debug("SQ: Segment fetching postponed because it cannot stream now."); return; } @@ -702,15 +716,4 @@ interface ISegmentQueueContentInfo { * `null` if no segment is awaiting an init segment. */ mediaSegmentAwaitingInitMetadata: string | null; - /** - * Indicates whether the user agent believes it has enough buffered data to ensure - * uninterrupted playback for a meaningful period or needs more data. - * It also reflects whether the user agent can retrieve and buffer data in an - * energy-efficient manner while maintaining the desired memory usage. - * The value can be `undefined` if the user agent does not provide this indicator. - * `true` indicates that the buffer is low, and more data should be buffered. - * `false` indicates that there is enough buffered data, and no additional data needs - * to be buffered at this time. - */ - canLoad: SharedReference; } diff --git a/src/core/fetchers/segment/segment_queue_creator.ts b/src/core/fetchers/segment/segment_queue_creator.ts index 454bad041b..4dae255b59 100644 --- a/src/core/fetchers/segment/segment_queue_creator.ts +++ b/src/core/fetchers/segment/segment_queue_creator.ts @@ -16,6 +16,7 @@ import config from "../../../config"; import type { ISegmentPipeline, ITransportPipelines } from "../../../transports"; +import type SharedReference from "../../../utils/reference"; import type { CancellationSignal } from "../../../utils/task_canceller"; import type CmcdDataBuilder from "../../cmcd"; import type { IBufferType } from "../../segment_sinks"; @@ -96,6 +97,7 @@ export default class SegmentQueueCreator { public createSegmentQueue( bufferType: IBufferType, eventListeners: ISegmentFetcherLifecycleCallbacks, + canLoad: SharedReference, ): SegmentQueue { const requestOptions = getSegmentFetcherRequestOptions(this._backoffOptions); const pipelines = this._transport[bufferType]; @@ -113,7 +115,7 @@ export default class SegmentQueueCreator { this._prioritizer, segmentFetcher, ); - return new SegmentQueue(prioritizedSegmentFetcher); + return new SegmentQueue(prioritizedSegmentFetcher, canLoad); } } diff --git a/src/core/stream/adaptation/adaptation_stream.ts b/src/core/stream/adaptation/adaptation_stream.ts index f680885e1b..a1cb92e80e 100644 --- a/src/core/stream/adaptation/adaptation_stream.ts +++ b/src/core/stream/adaptation/adaptation_stream.ts @@ -116,6 +116,16 @@ export default function AdaptationStream( adapStreamCanceller.signal, ); + const canStream = new SharedReference(true); + /** Update the `canLoad` ref on observation update */ + playbackObserver.listen((observation) => { + const observationCanStream = observation.canStream ?? true; + if (canStream.getValue() !== observationCanStream) { + log.debug("Stream: observation.canStream updated to", observationCanStream); + canStream.setValue(observationCanStream); + } + }); + /** Allows a `RepresentationStream` to easily fetch media segments. */ const segmentQueue = segmentQueueCreator.createSegmentQueue( adaptation.type, @@ -126,6 +136,7 @@ export default function AdaptationStream( onProgress: abrCallbacks.requestProgress, onMetrics: abrCallbacks.metrics, }, + canStream, ); /* eslint-enable @typescript-eslint/unbound-method */ diff --git a/src/core/stream/representation/representation_stream.ts b/src/core/stream/representation/representation_stream.ts index 1a619eebfd..38adbe1c9d 100644 --- a/src/core/stream/representation/representation_stream.ts +++ b/src/core/stream/representation/representation_stream.ts @@ -212,20 +212,19 @@ export default function RepresentationStream( const canStream = new SharedReference(true); - playbackObserver.listen((observation) => { - const observationCanStream = observation.canStream ?? true; - if (canStream.getValue() !== observationCanStream) { - log.debug("Stream: observation.canStream updated to", observationCanStream); - canStream.setValue(observationCanStream); - } - }); + playbackObserver.listen( + (observation) => { + const observationCanStream = observation.canStream ?? true; + if (canStream.getValue() !== observationCanStream) { + log.debug("Stream: observation.canStream updated to", observationCanStream); + canStream.setValue(observationCanStream); + } + }, + { clearSignal: segmentsLoadingCanceller.signal }, + ); /** Emit the last scheduled downloading queue for segments. */ - const segmentsToLoadRef = segmentQueue.resetForContent( - content, - hasInitSegment, - canStream, - ); + const segmentsToLoadRef = segmentQueue.resetForContent(content, hasInitSegment); segmentsLoadingCanceller.signal.register(() => { segmentQueue.stop();