Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correct the behavior to defer closeSegmentIndex() calls during updates #7217

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion lib/media/streaming_engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ shaka.media.StreamingEngine = class {
/** @private {?shaka.extern.StreamingConfiguration} */
this.config_ = null;

/**
* Retains a reference to the function used to close SegmentIndex objects
* for streams which were switched away from during an ongoing update_().
* @private {!Map.<string, !function()>}
*/
this.deferredCloseSegmentIndex_ = new Map();

/** @private {number} */
this.bufferingGoalScale_ = 1;

Expand Down Expand Up @@ -504,6 +511,24 @@ shaka.media.StreamingEngine = class {
}


/**
* Handles deferred releases of old SegmentIndexes for the mediaState's
* content type from a previous update.
* @param {!shaka.media.StreamingEngine.MediaState_} mediaState
* @private
*/
handleDeferredCloseSegmentIndexes_(mediaState) {
for (const [key, value] of this.deferredCloseSegmentIndex_.entries()) {
const streamId = /** @type {string} */ (key);
const closeSegmentIndex = /** @type {!function()} */ (value);
if (streamId.includes(mediaState.type)) {
closeSegmentIndex();
this.deferredCloseSegmentIndex_.delete(streamId);
}
}
}


/**
* Switches to the given Stream. |stream| may be from any Variant.
*
Expand Down Expand Up @@ -576,7 +601,23 @@ shaka.media.StreamingEngine = class {
// Do not close segment indexes we are prefetching.
if (!this.audioPrefetchMap_.has(mediaState.stream)) {
if (mediaState.stream.closeSegmentIndex) {
mediaState.stream.closeSegmentIndex();
if (mediaState.performingUpdate) {
const oldStreamTag =
shaka.media.StreamingEngine.logPrefix_(mediaState);
if (!this.deferredCloseSegmentIndex_.has(oldStreamTag)) {
// The ongoing update is still using the old stream's segment
// reference information.
// If we close the old stream now, the update will not complete
// correctly.
// The next onUpdate_() for this content type will resume the
// closeSegmentIndex() operation for the old stream once the ongoing
// update has finished, then immediately create a new segment index.
this.deferredCloseSegmentIndex_.set(
oldStreamTag, mediaState.stream.closeSegmentIndex);
}
} else {
mediaState.stream.closeSegmentIndex();
}
}
}

Expand Down Expand Up @@ -1169,6 +1210,13 @@ shaka.media.StreamingEngine = class {
return;
}

// If stream switches happened during the previous update_() for this
// content type, close out the old streams that were switched away from.
// Even if we had switched away from the active stream 'A' during the
// update_(), e.g. (A -> B -> A), closing 'A' is permissible here since we
// will immediately re-create it in the logic below.
this.handleDeferredCloseSegmentIndexes_(mediaState);

// Make sure the segment index exists. If not, create the segment index.
if (!mediaState.stream.segmentIndex) {
const thisStream = mediaState.stream;
Expand Down
166 changes: 142 additions & 24 deletions test/media/streaming_engine_unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,109 @@ describe('StreamingEngine', () => {
expect(mediaSourceEngine.resetCaptionParser).not.toHaveBeenCalled();
});

it('defers old stream cleanup on switchVariant during update', async () => {
// Delay the appendBuffer call until later so we are waiting for this to
// finish when we switch.
let p = new shaka.util.PublicPromise();
const old = mediaSourceEngine.appendBuffer;
// Replace the whole spy since we want to call the original.
mediaSourceEngine.appendBuffer =
jasmine.createSpy('appendBuffer')
.and.callFake(async (type, data, reference) => {
await p;
return Util.invokeSpy(old, type, data, reference);
});

// Starts with 'initialVariant' (video-11-%d/audio-10-%d).
await streamingEngine.start();
playing = true;

await Util.fakeEventLoop(1);

// Grab a reference to initialVariant's segmentIndex before the switch so
// we can test how its internal fields change overtime.
const initialVariantSegmentIndex = initialVariant.video.segmentIndex;

// Switch to 'differentVariant' (video-14-%d/audio-15-%d) in the middle of
// the update.
streamingEngine.switchVariant(differentVariant, /* clearBuffer= */ true);

// Finish the update for 'initialVariant'.
p.resolve();
// Create a new promise to delay the appendBuffer for 'differentVariant'.
p = new shaka.util.PublicPromise();
await Util.fakeEventLoop(1);

const segmentType = shaka.net.NetworkingEngine.RequestType.SEGMENT;
const segmentContext = {
type: shaka.net.NetworkingEngine.AdvancedRequestType.MEDIA_SEGMENT,
};

// Since a switch occurred in the middle of a fetch for a 'initialVariant'
// segment, the closing of the segment index for 'initialVariant' was
// deferred.
// We check the length of the segment references array to determine
// whether it was closed or not.
expect(initialVariantSegmentIndex.references.length).toBeGreaterThan(0);
netEngine.expectRequest('video-11-0.mp4', segmentType, segmentContext);
netEngine.expectRequest('audio-10-0.mp4', segmentType, segmentContext);
netEngine.expectNoRequest('video-14-0.mp4', segmentType, segmentContext);
netEngine.expectNoRequest('audio-15-0.mp4', segmentType, segmentContext);

// Finish the update for 'differentVariant'. At this point, the
// segmentIndex for 'initialVariant' has been closed.
p.resolve();
await Util.fakeEventLoop(2);
expect(initialVariantSegmentIndex.references.length).toBe(0);
});

it('defers old stream cleanup on fast switch during update', async () => {
setupVod();

// Delay the appendBuffer call until later so we are waiting for this to
// finish when we switch.
const p = new shaka.util.PublicPromise();
const old = mediaSourceEngine.appendBuffer;
mediaSourceEngine.appendBuffer =
jasmine.createSpy('appendBuffer')
.and.callFake(async (type, data, reference) => {
await p;
return Util.invokeSpy(old, type, data, reference);
});

streamingEngine.switchVariant(variant, /* clearBuffer= */ true);
await streamingEngine.start();
playing = true;

expect(variant.video.createSegmentIndex).not.toHaveBeenCalled();
await Util.fakeEventLoop(1);
expect(variant.video.createSegmentIndex).toHaveBeenCalledTimes(1);

// Switch from variant A -> B -> A ("fast switch") multiple times.
for (let i = 0; i < 5; i++) {
streamingEngine.switchVariant(
alternateVariant, /* clearBuffer= */ true);
streamingEngine.switchVariant(variant, /* clearBuffer= */ true);
}
// Can resolve now to ensure all the switches happened during the update.
p.resolve();

// Give enough time for the next scheduled update to execute with the
// currently active variant ('variant').
await runTest();

// During the next scheduled update for 'variant', we close all streams
// that were switched away from, regardless of whether it is the active
// stream.
expect(variant.video.closeSegmentIndex).toHaveBeenCalledTimes(1);
expect(alternateVariant.video.closeSegmentIndex).toHaveBeenCalledTimes(1);
// However, we close all the deferred streams right before the check to
// create a new segmentIndex for the currently active stream.
expect(variant.video.createSegmentIndex).toHaveBeenCalledTimes(2);
expect(variant.video.segmentIndex).not.toBe(null);
expect(alternateVariant.video.segmentIndex).toBe(null);
});

// See https://github.com/shaka-project/shaka-player/issues/2956
it('works with fast variant switches during update', async () => {
// Delay the appendBuffer call until later so we are waiting for this to
Expand Down Expand Up @@ -1188,7 +1291,6 @@ describe('StreamingEngine', () => {
netEngine.expectRequest('text-20-0.mp4', segmentType, segmentContext);
netEngine.expectNoRequest('text-20-init', segmentType, segmentContext);
netEngine.expectNoRequest('text-21-init', segmentType, segmentContext);
// TODO: huh?
});
});

Expand Down Expand Up @@ -2213,9 +2315,11 @@ describe('StreamingEngine', () => {
});

onError.and.callFake((error) => {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.BAD_HTTP_STATUS);
if (error instanceof shaka.util.Error) {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.BAD_HTTP_STATUS);
}
});

disableStream.and.callFake((stream, time) => {
Expand All @@ -2238,6 +2342,7 @@ describe('StreamingEngine', () => {

await runTest();
expect(disableStream).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalled();
});

it('does not temporarily disables stream if not configured to',
Expand Down Expand Up @@ -2289,9 +2394,12 @@ describe('StreamingEngine', () => {
});

onError.and.callFake((error) => {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.SEGMENT_MISSING);
if (error instanceof shaka.util.Error) {
expect(error.severity).toBe(
shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.SEGMENT_MISSING);
}
});

disableStream.and.callFake((stream, time) => {
Expand All @@ -2314,6 +2422,7 @@ describe('StreamingEngine', () => {

await runTest();
expect(disableStream).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalled();
});

it('throws recoverable error if try to disable stream succeeded',
Expand Down Expand Up @@ -4390,34 +4499,43 @@ describe('StreamingEngine', () => {
* @param {shaka.extern.Stream} alternateStream
*/
function createAlternateSegmentIndex(baseStream, alternateStream) {
const closeSegmentIndexSpy = Util.funcSpy(
/** @type {!function()} */ (alternateStream.closeSegmentIndex));
const createSegmentIndexSpy =
Util.funcSpy(alternateStream.createSegmentIndex);
const altSegmentIndex = new shaka.test.FakeSegmentIndex();

altSegmentIndex.find.and.callFake(
(time) => baseStream.segmentIndex.find(time));
createSegmentIndexSpy.and.callFake(() => {
const altSegmentIndex = new shaka.test.FakeSegmentIndex();

altSegmentIndex.getNumReferences.and.callFake(
() => baseStream.segmentIndex.getNumReferences());
altSegmentIndex.find.and.callFake(
(time) => baseStream.segmentIndex.find(time));

altSegmentIndex.get.and.callFake((pos) => {
const ref = baseStream.segmentIndex.get(pos);
altSegmentIndex.getNumReferences.and.callFake(
() => baseStream.segmentIndex.getNumReferences());

if (ref) {
const altInitUri = ref.initSegmentReference.getUris()[0] + '_alt';
const altSegmentUri = ref.getUris()[0] + '_alt';
altSegmentIndex.get.and.callFake((pos) => {
const ref = baseStream.segmentIndex.get(pos);

ref.initSegmentReference.getUris = () => [altInitUri];
ref.getUris = () => [altSegmentUri];
return ref;
}
if (ref) {
const altInitUri = ref.initSegmentReference.getUris()[0] + '_alt';
const altSegmentUri = ref.getUris()[0] + '_alt';

return null;
});
ref.initSegmentReference.getUris = () => [altInitUri];
ref.getUris = () => [altSegmentUri];
return ref;
}

createSegmentIndexSpy.and.callFake(() => {
return null;
});
alternateStream.segmentIndex = altSegmentIndex;
return Promise.resolve();
});
closeSegmentIndexSpy.and.callFake(() => {
if (alternateStream.segmentIndex) {
alternateStream.segmentIndex.release();
}
alternateStream.segmentIndex = null;
return Promise.resolve();
});
}
});
22 changes: 17 additions & 5 deletions test/test/util/manifest_generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -530,22 +530,34 @@ shaka.test.ManifestGenerator.Stream = class {
}

if (!isPartial) {
const shaka_ = manifest ? manifest.shaka_ : shaka;

/** @type {shaka.media.SegmentIndex} */
this.segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);

const create =
jasmine.createSpy('createSegmentIndex').and.callFake(() => {
this.segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);
return Promise.resolve();
});
const shaka_ = manifest ? manifest.shaka_ : shaka;
const segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);
const close = jasmine.createSpy('closeSegmentIndex').and.callFake(() => {
if (this.segmentIndex) {
this.segmentIndex.release();
}
this.segmentIndex = null;
return Promise.resolve();
});

/** @type {?string} */
this.originalId = null;
/** @type {?string} */
this.groupId = null;
/** @type {shaka.extern.CreateSegmentIndexFunction} */
this.createSegmentIndex = shaka.test.Util.spyFunc(create);
/** @type {shaka.media.SegmentIndex} */
this.segmentIndex = segmentIndex;
/** @type {!function()|undefined} */
this.closeSegmentIndex = shaka.test.Util.spyFunc(close);
/** @type {string} */
this.mimeType = defaultMimeType;
/** @type {string} */
Expand Down
Loading
Loading