Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Florent-Bouisset committed Nov 18, 2024
1 parent a5cbf86 commit 3e701f6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 29 deletions.
35 changes: 19 additions & 16 deletions src/core/fetchers/segment/segment_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,32 @@ export default class SegmentQueue<T> extends EventEmitter<ISegmentQueueEvent<T>>
*/
private _currentContentInfo: ISegmentQueueContentInfo | null;

/**
* Indicates whether the user agent believes it has enough buffered data to ensure
* uninterrupted playback for a meaningful period or needs more data.
* It also reflects whether the user agent can retrieve and buffer data in an
* energy-efficient manner while maintaining the desired memory usage.
* The value can be `undefined` if the user agent does not provide this indicator.
* `true` indicates that the buffer is low, and more data should be buffered.
* `false` indicates that there is enough buffered data, and no additional data needs
* to be buffered at this time.
*/
private canLoad: SharedReference<boolean>;

/**
* Create a new `SegmentQueue`.
*
* @param {Object} segmentFetcher - Interface to facilitate the download of
* segments.
*/
constructor(segmentFetcher: IPrioritizedSegmentFetcher<T>) {
constructor(
segmentFetcher: IPrioritizedSegmentFetcher<T>,
canLoad: SharedReference<boolean>,
) {
super();
this._segmentFetcher = segmentFetcher;
this._currentContentInfo = null;
this.canLoad = canLoad;
}

/**
Expand Down Expand Up @@ -118,7 +134,6 @@ export default class SegmentQueue<T> extends EventEmitter<ISegmentQueueEvent<T>>
public resetForContent(
content: ISegmentQueueContext,
hasInitSegment: boolean,
canLoad: SharedReference<boolean>,
): SharedReference<ISegmentQueueItem> {
this._currentContentInfo?.currentCanceller.cancel();
const downloadQueue = new SharedReference<ISegmentQueueItem>({
Expand All @@ -139,11 +154,10 @@ export default class SegmentQueue<T> extends EventEmitter<ISegmentQueueEvent<T>>
initSegmentRequest: null,
mediaSegmentRequest: null,
mediaSegmentAwaitingInitMetadata: null,
canLoad,
};
this._currentContentInfo = currentContentInfo;

this._currentContentInfo.canLoad.onUpdate(
this.canLoad.onUpdate(
(val) => {
if (val) {
log.debug(
Expand Down Expand Up @@ -274,7 +288,7 @@ export default class SegmentQueue<T> extends EventEmitter<ISegmentQueueEvent<T>>
const { downloadQueue, content, initSegmentInfoRef, currentCanceller } = contentInfo;

const recursivelyRequestSegments = (): void => {
if (!contentInfo.canLoad.getValue()) {
if (!this.canLoad.getValue()) {
log.debug("SQ: Segment fetching postponed because it cannot stream now.");
return;
}
Expand Down Expand Up @@ -702,15 +716,4 @@ interface ISegmentQueueContentInfo {
* `null` if no segment is awaiting an init segment.
*/
mediaSegmentAwaitingInitMetadata: string | null;
/**
* Indicates whether the user agent believes it has enough buffered data to ensure
* uninterrupted playback for a meaningful period or needs more data.
* It also reflects whether the user agent can retrieve and buffer data in an
* energy-efficient manner while maintaining the desired memory usage.
* The value can be `undefined` if the user agent does not provide this indicator.
* `true` indicates that the buffer is low, and more data should be buffered.
* `false` indicates that there is enough buffered data, and no additional data needs
* to be buffered at this time.
*/
canLoad: SharedReference<boolean>;
}
4 changes: 3 additions & 1 deletion src/core/fetchers/segment/segment_queue_creator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import config from "../../../config";
import type { ISegmentPipeline, ITransportPipelines } from "../../../transports";
import type SharedReference from "../../../utils/reference";
import type { CancellationSignal } from "../../../utils/task_canceller";
import type CmcdDataBuilder from "../../cmcd";
import type { IBufferType } from "../../segment_sinks";
Expand Down Expand Up @@ -96,6 +97,7 @@ export default class SegmentQueueCreator {
public createSegmentQueue(
bufferType: IBufferType,
eventListeners: ISegmentFetcherLifecycleCallbacks,
canLoad: SharedReference<boolean>,
): SegmentQueue<unknown> {
const requestOptions = getSegmentFetcherRequestOptions(this._backoffOptions);
const pipelines = this._transport[bufferType];
Expand All @@ -113,7 +115,7 @@ export default class SegmentQueueCreator {
this._prioritizer,
segmentFetcher,
);
return new SegmentQueue(prioritizedSegmentFetcher);
return new SegmentQueue(prioritizedSegmentFetcher, canLoad);
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/core/stream/adaptation/adaptation_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ export default function AdaptationStream(
adapStreamCanceller.signal,
);

const canStream = new SharedReference<boolean>(true);
/** Update the `canLoad` ref on observation update */
playbackObserver.listen((observation) => {

Check failure on line 121 in src/core/stream/adaptation/adaptation_stream.ts

View workflow job for this annotation

GitHub Actions / integration_linux (20.x)

Expected 2 arguments, but got 1.

Check failure on line 121 in src/core/stream/adaptation/adaptation_stream.ts

View workflow job for this annotation

GitHub Actions / integration_linux (20.x)

Expected 2 arguments, but got 1.

Check failure on line 121 in src/core/stream/adaptation/adaptation_stream.ts

View workflow job for this annotation

GitHub Actions / typechecking_and_linting (20.x)

Expected 2 arguments, but got 1.
const observationCanStream = observation.canStream ?? true;
if (canStream.getValue() !== observationCanStream) {
log.debug("Stream: observation.canStream updated to", observationCanStream);
canStream.setValue(observationCanStream);
}
});

/** Allows a `RepresentationStream` to easily fetch media segments. */
const segmentQueue = segmentQueueCreator.createSegmentQueue(
adaptation.type,
Expand All @@ -126,6 +136,7 @@ export default function AdaptationStream(
onProgress: abrCallbacks.requestProgress,
onMetrics: abrCallbacks.metrics,
},
canStream,
);
/* eslint-enable @typescript-eslint/unbound-method */

Expand Down
23 changes: 11 additions & 12 deletions src/core/stream/representation/representation_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,19 @@ export default function RepresentationStream<TSegmentDataType>(

const canStream = new SharedReference<boolean>(true);

playbackObserver.listen((observation) => {
const observationCanStream = observation.canStream ?? true;
if (canStream.getValue() !== observationCanStream) {
log.debug("Stream: observation.canStream updated to", observationCanStream);
canStream.setValue(observationCanStream);
}
});
playbackObserver.listen(
(observation) => {
const observationCanStream = observation.canStream ?? true;
if (canStream.getValue() !== observationCanStream) {
log.debug("Stream: observation.canStream updated to", observationCanStream);
canStream.setValue(observationCanStream);
}
},
{ clearSignal: segmentsLoadingCanceller.signal },
);

/** Emit the last scheduled downloading queue for segments. */
const segmentsToLoadRef = segmentQueue.resetForContent(
content,
hasInitSegment,
canStream,
);
const segmentsToLoadRef = segmentQueue.resetForContent(content, hasInitSegment);

segmentsLoadingCanceller.signal.register(() => {
segmentQueue.stop();
Expand Down

0 comments on commit 3e701f6

Please sign in to comment.