Skip to content

Commit

Permalink
feat: Add support to streamDataCallback when using prefetch (#5310)
Browse files Browse the repository at this point in the history
This change improve the latency in LL-DASH streams when using prefetch
  • Loading branch information
avelad authored Jun 15, 2023
1 parent 08317d0 commit 6104b57
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 12 deletions.
91 changes: 81 additions & 10 deletions lib/media/segment_prefetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ shaka.media.SegmentPrefetch = class {

/**
* @private {!Map.<shaka.media.SegmentReference,
* !shaka.net.NetworkingEngine.PendingRequest>}
* !shaka.media.SegmentPrefetchOperation>}
*/
this.segmentPrefetchMap_ = new Map();
}
Expand Down Expand Up @@ -68,8 +68,10 @@ shaka.media.SegmentPrefetch = class {
while (this.segmentPrefetchMap_.size < this.prefetchLimit_ &&
reference != null) {
if (!this.segmentPrefetchMap_.has(reference)) {
const op = this.fetchDispatcher_(reference, this.stream_);
this.segmentPrefetchMap_.set(reference, op);
const segmentPrefetchOperation =
new shaka.media.SegmentPrefetchOperation(this.fetchDispatcher_);
segmentPrefetchOperation.dispatchFetch(reference, this.stream_);
this.segmentPrefetchMap_.set(reference, segmentPrefetchOperation);
}
this.prefetchPosTime_ = reference.startTime;
reference = iterator.next().value;
Expand All @@ -79,10 +81,11 @@ shaka.media.SegmentPrefetch = class {
/**
* Get the result of prefetched segment if already exists.
* @param {(!shaka.media.SegmentReference)} reference
* @param {?function(BufferSource):!Promise=} streamDataCallback
* @return {?shaka.net.NetworkingEngine.PendingRequest} op
* @public
*/
getPrefetchedSegment(reference) {
getPrefetchedSegment(reference, streamDataCallback) {
goog.asserts.assert(this.prefetchLimit_ > 0,
'SegmentPrefetch can not be used when prefetchLimit <= 0.');
goog.asserts.assert(reference instanceof shaka.media.SegmentReference,
Expand All @@ -91,13 +94,16 @@ shaka.media.SegmentPrefetch = class {
const logPrefix = shaka.media.SegmentPrefetch.logPrefix_(this.stream_);

if (this.segmentPrefetchMap_.has(reference)) {
const op = this.segmentPrefetchMap_.get(reference);
const segmentPrefetchOperation = this.segmentPrefetchMap_.get(reference);
if (streamDataCallback) {
segmentPrefetchOperation.setStreamDataCallback(streamDataCallback);
}
this.segmentPrefetchMap_.delete(reference);
shaka.log.debug(
logPrefix,
'reused prefetched segment at time:', reference.startTime,
'mapSize', this.segmentPrefetchMap_.size);
return op;
return segmentPrefetchOperation.getOperation();
} else {
shaka.log.debug(
logPrefix,
Expand Down Expand Up @@ -163,10 +169,10 @@ shaka.media.SegmentPrefetch = class {
*/
abortPrefetchedSegment_(reference) {
const logPrefix = shaka.media.SegmentPrefetch.logPrefix_(this.stream_);
const operation = this.segmentPrefetchMap_.get(reference);
const segmentPrefetchOperation = this.segmentPrefetchMap_.get(reference);
this.segmentPrefetchMap_.delete(reference);
if (operation) {
operation.abort();
if (segmentPrefetchOperation) {
segmentPrefetchOperation.abort();
shaka.log.info(
logPrefix,
'pop and abort prefetched segment at time:', reference.startTime);
Expand All @@ -183,10 +189,75 @@ shaka.media.SegmentPrefetch = class {
}
};

/**
* @summary
* This class manages a segment prefetch operation.
*/
shaka.media.SegmentPrefetchOperation = class {
/**
* @param {shaka.media.SegmentPrefetch.FetchDispatcher} fetchDispatcher
*/
constructor(fetchDispatcher) {
/** @private {shaka.media.SegmentPrefetch.FetchDispatcher} */
this.fetchDispatcher_ = fetchDispatcher;

/** @private {?function(BufferSource):!Promise} */
this.streamDataCallback_ = null;

/** @private {?shaka.net.NetworkingEngine.PendingRequest} */
this.operation_ = null;
}

/**
* Fetch a segments
*
* @param {!shaka.media.SegmentReference}
* reference
* @param {!shaka.extern.Stream} stream
* @public
*/
dispatchFetch(reference, stream) {
this.operation_ = this.fetchDispatcher_(
reference, stream, async (data) => {
if (this.streamDataCallback_) {
await this.streamDataCallback_(data);
}
});
}

/**
* Get the operation of prefetched segment if already exists.
*
* @return {?shaka.net.NetworkingEngine.PendingRequest} op
* @public
*/
getOperation() {
return this.operation_;
}

/**
* @param {?function(BufferSource):!Promise} streamDataCallback
* @public
*/
setStreamDataCallback(streamDataCallback) {
this.streamDataCallback_ = streamDataCallback;
}

/**
* Abort the current operation if exists.
*/
abort() {
if (this.operation_) {
this.operation_.abort();
}
}
};

/**
* @typedef {function(
* !(shaka.media.InitSegmentReference|shaka.media.SegmentReference),
* shaka.extern.Stream
* shaka.extern.Stream,
* ?function(BufferSource):!Promise=
* ):!shaka.net.NetworkingEngine.PendingRequest}
*
* @description
Expand Down
7 changes: 5 additions & 2 deletions lib/media/streaming_engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,9 @@ shaka.media.StreamingEngine = class {
return new shaka.media.SegmentPrefetch(
this.config_.segmentPrefetchLimit,
stream,
(reference, stream) => this.dispatchFetch_(reference, stream, null),
(reference, stream, streamDataCallback) => {
return this.dispatchFetch_(reference, stream, streamDataCallback);
},
);
}
return null;
Expand Down Expand Up @@ -2162,7 +2164,8 @@ shaka.media.StreamingEngine = class {
mediaState.segmentPrefetch &&
reference instanceof shaka.media.SegmentReference
) {
op = mediaState.segmentPrefetch.getPrefetchedSegment(reference);
op = mediaState.segmentPrefetch.getPrefetchedSegment(
reference, streamDataCallback);
}
if (!op) {
op = this.dispatchFetch_(
Expand Down

0 comments on commit 6104b57

Please sign in to comment.