From 7dfd99676039b7cd20f07530556650390c1af27e Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Tue, 4 May 2021 17:51:44 +0200 Subject: [PATCH] transports: segment parser now return result synchronously Every segment parser in `src/transports` (which allows to parse things like data, encryption metadata and time information from loaded segments according to the current streaming protocol) do so synchronously. However, those are typed as Observables. My guess (this was before my time on the project) is that it was mainly for two reasons: 1. Manifest parsing can be asynchronous (for example to fetch MPD xlinks or the clock) and it follows the same pipeline architecture (a loader and a parser) than segments. Letting segment parser be asynchronous thus allows to harmonize all resource-fetching code to a similar interface. 2. Theorically, segment parsing could be asynchronous. As far as I can see for now, this could be because of two reasons: - We need to perform an HTTPS request to fully parse segment information. We already encountered such case when trying to implement the "sidx ref type 1" feature (#754). However we did not merge that work for now due to the complexity and most of all because no real DASH MPD seems to rely on this feature (nor ar we able to envisage a real use-case for it). - segment parsing is performed in a Worker. But We do not feel the need to do that for now as the CPU footprint of parsing segments is really low. So theorically, there's good reasons to make the parsing operation asynchronous, but reastically for now, there's none. Meanwhile, making that operation asynchronous lead to big headaches: - it is one of the main reasons why the PR on making the init and first media segment's requests in parallel [#918] was not merged. Subtle bugs, some that would only be seen in a case where the parsing is asynchronous (so not really existant for now) could arise because of that. - Even if the response of the parser is wrapped in an Observable, it wasn't lazy (the code would be called even if the parser was not subscribed to). We could fix that by wrapping all parsers in a `defer` call from RxJS, but it would complexify even more the code. - Handling Observables is a LOT harder than just using directly a response in general. There's pending work about not depending so much on RxJS in the future [#916] and I think beginning to remove it where it is not even needed is a good first step. If it become needed again in the future (e.g. to support the "sidx ref type 1" feature), we could always revert that work, maybe even with a more Promise-based solution which might be much easier to reason about. Thoughts? --- src/core/fetchers/segment/segment_fetcher.ts | 44 +++--- .../representation/push_media_segment.ts | 4 +- .../representation/representation_stream.ts | 10 +- src/transports/README.md | 12 +- src/transports/dash/image_pipelines.ts | 49 +++--- src/transports/dash/segment_parser.ts | 64 ++++---- src/transports/dash/text_parser.ts | 76 +++++----- src/transports/local/segment_parser.ts | 51 +++---- src/transports/local/text_parser.ts | 76 +++++----- src/transports/metaplaylist/pipelines.ts | 93 ++++++------ src/transports/smooth/pipelines.ts | 139 +++++++++--------- src/transports/types.ts | 12 +- 12 files changed, 307 insertions(+), 323 deletions(-) diff --git a/src/core/fetchers/segment/segment_fetcher.ts b/src/core/fetchers/segment/segment_fetcher.ts index 881a1e2a70..0aac042f2a 100644 --- a/src/core/fetchers/segment/segment_fetcher.ts +++ b/src/core/fetchers/segment/segment_fetcher.ts @@ -21,7 +21,6 @@ import { Subject, } from "rxjs"; import { - catchError, filter, finalize, mergeMap, @@ -67,9 +66,19 @@ export type ISegmentFetcherWarning = ISegmentLoaderWarning; */ export interface ISegmentFetcherChunkEvent { type : "chunk"; - /** Parse the downloaded chunk. */ - parse : (initTimescale? : number) => Observable | - ISegmentParserSegment>; + /** + * Parse the downloaded chunk. + * + * Take in argument the timescale value that might have been obtained by + * parsing an initialization segment from the same Representation. + * Can be left to `undefined` if unknown or inexistant, segment parsers should + * be resilient and still work without that information. + * + * @param {number} initTimescale + * @returns {Object} + */ + parse(initTimescale? : number) : ISegmentParserInitSegment | + ISegmentParserSegment; } /** @@ -208,20 +217,21 @@ export default function createSegmentFetcher( * @param {Object} [initTimescale] * @returns {Observable} */ - parse(initTimescale? : number) : Observable | - ISegmentParserSegment> { + parse(initTimescale? : number) : ISegmentParserInitSegment | + ISegmentParserSegment { const response = { data: evt.value.responseData, isChunked }; - /* eslint-disable @typescript-eslint/no-unsafe-call */ - /* eslint-disable @typescript-eslint/no-unsafe-member-access */ - /* eslint-disable @typescript-eslint/no-unsafe-return */ - return segmentParser({ response, initTimescale, content }) - /* eslint-enable @typescript-eslint/no-unsafe-call */ - /* eslint-enable @typescript-eslint/no-unsafe-member-access */ - /* eslint-enable @typescript-eslint/no-unsafe-return */ - .pipe(catchError((error: unknown) => { - throw formatError(error, { defaultCode: "PIPELINE_PARSE_ERROR", - defaultReason: "Unknown parsing error" }); - })); + try { + /* eslint-disable @typescript-eslint/no-unsafe-call */ + /* eslint-disable @typescript-eslint/no-unsafe-member-access */ + /* eslint-disable @typescript-eslint/no-unsafe-return */ + return segmentParser({ response, initTimescale, content }); + /* eslint-enable @typescript-eslint/no-unsafe-call */ + /* eslint-enable @typescript-eslint/no-unsafe-member-access */ + /* eslint-enable @typescript-eslint/no-unsafe-return */ + } catch (error : unknown) { + throw formatError(error, { defaultCode: "PIPELINE_PARSE_ERROR", + defaultReason: "Unknown parsing error" }); + } }, }; diff --git a/src/core/stream/representation/push_media_segment.ts b/src/core/stream/representation/push_media_segment.ts index a4de54ef09..48f0f72cdd 100644 --- a/src/core/stream/representation/push_media_segment.ts +++ b/src/core/stream/representation/push_media_segment.ts @@ -27,7 +27,7 @@ import Manifest, { Period, Representation, } from "../../../manifest"; -import { ISegmentParserParsedSegment } from "../../../transports"; +import { ISegmentParserSegmentPayload } from "../../../transports"; import objectAssign from "../../../utils/object_assign"; import { SegmentBuffer } from "../../segment_buffers"; import EVENTS from "../events_generators"; @@ -56,7 +56,7 @@ export default function pushMediaSegment( period : Period; representation : Representation; }; initSegmentData : T | null; - parsedSegment : ISegmentParserParsedSegment; + parsedSegment : ISegmentParserSegmentPayload; segment : ISegment; segmentBuffer : SegmentBuffer; } ) : Observable< IStreamEventAddedSegment > { diff --git a/src/core/stream/representation/representation_stream.ts b/src/core/stream/representation/representation_stream.ts index 2f03f21ec7..0fb39dfdd2 100644 --- a/src/core/stream/representation/representation_stream.ts +++ b/src/core/stream/representation/representation_stream.ts @@ -38,7 +38,6 @@ import { import { finalize, ignoreElements, - map, mergeMap, share, startWith, @@ -57,7 +56,7 @@ import Manifest, { } from "../../../manifest"; import { ISegmentParserInitSegment, - ISegmentParserParsedInitSegment, + ISegmentParserInitSegmentPayload, ISegmentParserSegment, } from "../../../transports"; import assertUnreachable from "../../../utils/assert_unreachable"; @@ -254,7 +253,7 @@ export default function RepresentationStream({ * Saved initialization segment state for this representation. * `null` if the initialization segment hasn't been loaded yet. */ - let initSegmentObject : ISegmentParserParsedInitSegment | null = + let initSegmentObject : ISegmentParserInitSegmentPayload | null = initSegment === null ? { initializationData: null, protectionDataUpdate: false, initTimescale: undefined } : @@ -461,9 +460,8 @@ export default function RepresentationStream({ case "chunk": const initTimescale = initSegmentObject?.initTimescale; - return evt.parse(initTimescale).pipe(map(parserResponse => { - return objectAssign({ segment }, parserResponse); - })); + const parsed = evt.parse(initTimescale); + return observableOf(objectAssign({ segment }, parsed)); case "ended": return requestNextSegment$; diff --git a/src/transports/README.md b/src/transports/README.md index c6621d023b..6b8582c3f7 100644 --- a/src/transports/README.md +++ b/src/transports/README.md @@ -130,12 +130,16 @@ Its concept can be illustrated as such: +--------+ ``` -The parser returns an Observable which will emit the parsed resource when done. +Depending on the type of parser (e.g. Manifest parser or segment parser), that +task can be synchronous or asynchronous. -This Observable will throw if the resource is corrupted or miss crucial -information. +In asynchronous cases, the parser will return an Observable emitting a unique +time the result when done and throwing if an error is encountered. -[1] the parser could also need to perform requests (e.g. it needs to fetch the +In synchronous cases, the parser returns directly the result, and can throw +directly when/if an error is encountered. + +[1] a parser could also need to perform requests (e.g. it needs to fetch the current time from a server). In such cases, the parser is given a special callback, which allows it to receive the same error-handling perks than a loader, such as multiple retries, diff --git a/src/transports/dash/image_pipelines.ts b/src/transports/dash/image_pipelines.ts index ed4b4df238..0969ab981f 100644 --- a/src/transports/dash/image_pipelines.ts +++ b/src/transports/dash/image_pipelines.ts @@ -55,17 +55,17 @@ export function imageLoader( export function imageParser( { response, content } : ISegmentParserArguments -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { segment, period } = content; const { data, isChunked } = response; if (content.segment.isInit) { // image init segment has no use - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } if (isChunked) { @@ -76,27 +76,26 @@ export function imageParser( // TODO image Parsing should be more on the buffer side, no? if (data === null || features.imageParser === null) { - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: { duration: segment.duration, - time: segment.time }, - chunkOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: { duration: segment.duration, + time: segment.time }, + chunkOffset, + appendWindow: [period.start, period.end], + protectionDataUpdate: false } }; } const bifObject = features.imageParser(new Uint8Array(data)); const thumbsData = bifObject.thumbs; - return observableOf({ type: "parsed-segment", - value: { chunkData: { data: thumbsData, - start: 0, - end: Number.MAX_VALUE, - timescale: 1, - type: "bif" }, - chunkInfos: { time: 0, - duration: Number.MAX_VALUE, - timescale: bifObject.timescale }, - chunkOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: { data: thumbsData, + start: 0, + end: Number.MAX_VALUE, + timescale: 1, + type: "bif" }, + chunkInfos: { time: 0, + duration: Number.MAX_VALUE }, + chunkOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } diff --git a/src/transports/dash/segment_parser.ts b/src/transports/dash/segment_parser.ts index c76cd3cbbe..96c338cb2d 100644 --- a/src/transports/dash/segment_parser.ts +++ b/src/transports/dash/segment_parser.ts @@ -14,10 +14,6 @@ * limitations under the License. */ -import { - Observable, - of as observableOf, -} from "rxjs"; import { getMDHDTimescale, getSegmentsFromSidx, @@ -53,25 +49,25 @@ export default function generateAudioVideoSegmentParser( initTimescale } : ISegmentParserArguments< Uint8Array | ArrayBuffer | null > - ) : Observable | - ISegmentParserSegment< Uint8Array | ArrayBuffer | null>> { + ) : ISegmentParserInitSegment | + ISegmentParserSegment< Uint8Array | ArrayBuffer | null> { const { period, adaptation, representation, segment, manifest } = content; const { data, isChunked } = response; const appendWindow : [number, number | undefined] = [ period.start, period.end ]; if (data === null) { if (segment.isInit) { - return observableOf({ type: "parsed-init-segment" as const, - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment" as const, + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } - return observableOf({ type: "parsed-segment" as const, - value: { chunkData: null, - chunkInfos: null, - chunkOffset: 0, - appendWindow, - protectionDataUpdate: false } }); + return { type: "parsed-segment" as const, + value: { chunkData: null, + chunkInfos: null, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow } }; } const chunkData = data instanceof Uint8Array ? data : @@ -112,24 +108,24 @@ export default function generateAudioVideoSegmentParser( manifest.publishTime); if (events !== undefined) { const { needsManifestRefresh, inbandEvents } = events; - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos, - chunkOffset, - appendWindow, - inbandEvents, - needsManifestRefresh, - protectionDataUpdate } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos, + chunkOffset, + appendWindow, + inbandEvents, + protectionDataUpdate, + needsManifestRefresh } }; } } } - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos, - chunkOffset, - appendWindow, - protectionDataUpdate } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos, + chunkOffset, + protectionDataUpdate, + appendWindow } }; } // we're handling an initialization segment const { indexRange } = segment; @@ -174,9 +170,9 @@ export default function generateAudioVideoSegmentParser( const parsedTimescale = isNullOrUndefined(timescale) ? undefined : timescale; - return observableOf({ type: "parsed-init-segment", - value: { initializationData: chunkData, - protectionDataUpdate, - initTimescale: parsedTimescale } }); + return { type: "parsed-init-segment", + value: { initializationData: chunkData, + protectionDataUpdate, + initTimescale: parsedTimescale } }; }; } diff --git a/src/transports/dash/text_parser.ts b/src/transports/dash/text_parser.ts index d74d8f88e2..1ebf5847de 100644 --- a/src/transports/dash/text_parser.ts +++ b/src/transports/dash/text_parser.ts @@ -14,10 +14,6 @@ * limitations under the License. */ -import { - Observable, - of as observableOf, -} from "rxjs"; import { getMDHDTimescale, getSegmentsFromSidx, @@ -53,8 +49,8 @@ function parseISOBMFFEmbeddedTextTrack( ArrayBuffer | string >, __priv_patchLastSegmentInSidx? : boolean -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, representation, segment } = content; const { isInit, indexRange } = segment; @@ -92,10 +88,10 @@ function parseISOBMFFEmbeddedTextTrack( { representation.index.initializeIndex(sidxSegments); } - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: mdhdTimescale } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: mdhdTimescale } }; } const chunkInfos = getISOBMFFTimingInfos(chunkBytes, isChunked, @@ -106,12 +102,12 @@ function parseISOBMFFEmbeddedTextTrack( chunkInfos, isChunked); const chunkOffset = takeFirstSet(segment.timestampOffset, 0); - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos, - chunkOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos, + chunkOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } /** @@ -124,16 +120,16 @@ function parsePlainTextTrack( content } : ISegmentParserArguments< Uint8Array | ArrayBuffer | string > -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, segment } = content; const { timestampOffset = 0 } = segment; if (segment.isInit) { - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } const { data, isChunked } = response; @@ -146,12 +142,12 @@ function parsePlainTextTrack( textTrackData = data; } const chunkData = getPlainTextTrackData(content, textTrackData, isChunked); - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos: null, - chunkOffset: timestampOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos: null, + chunkOffset: timestampOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } /** @@ -173,25 +169,25 @@ export default function generateTextTrackParser( ArrayBuffer | string | null > - ) : Observable | - ISegmentParserSegment> + ) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, adaptation, representation, segment } = content; const { timestampOffset = 0 } = segment; const { data, isChunked } = response; if (data === null) { // No data, just return empty infos if (segment.isInit) { - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: null, - chunkOffset: timestampOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: null, + chunkOffset: timestampOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } const containerType = inferSegmentContainer(adaptation.type, representation); diff --git a/src/transports/local/segment_parser.ts b/src/transports/local/segment_parser.ts index 6c4d385774..0b51e262bc 100644 --- a/src/transports/local/segment_parser.ts +++ b/src/transports/local/segment_parser.ts @@ -14,16 +14,11 @@ * limitations under the License. */ -import { - Observable, - of as observableOf, -} from "rxjs"; import { getMDHDTimescale, takePSSHOut, } from "../../parsers/containers/isobmff"; import { getTimeCodeScale } from "../../parsers/containers/matroska"; -import isNullOrUndefined from "../../utils/is_null_or_undefined"; import takeFirstSet from "../../utils/take_first_set"; import { ISegmentParserSegment, @@ -38,8 +33,8 @@ export default function segmentParser({ response, initTimescale, } : ISegmentParserArguments -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, adaptation, representation, segment } = content; const { data } = response; @@ -47,17 +42,17 @@ export default function segmentParser({ if (data === null) { if (segment.isInit) { - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: null, - chunkOffset: 0, - appendWindow, - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: null, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow } }; } const chunkData = new Uint8Array(data); @@ -77,12 +72,10 @@ export default function segmentParser({ const timescale = containerType === "webm" ? getTimeCodeScale(chunkData, 0) : // assume ISOBMFF-compliance getMDHDTimescale(chunkData); - return observableOf({ type: "parsed-init-segment", - value: { initializationData: chunkData, - initTimescale: isNullOrUndefined(timescale) ? - undefined : - timescale, - protectionDataUpdate } }); + return { type: "parsed-init-segment", + value: { initializationData: chunkData, + initTimescale: timescale ?? undefined, + protectionDataUpdate } }; } const chunkInfos = seemsToBeMP4 ? getISOBMFFTimingInfos(chunkData, @@ -91,10 +84,10 @@ export default function segmentParser({ initTimescale) : null; // TODO extract time info from webm const chunkOffset = takeFirstSet(segment.timestampOffset, 0); - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos, - chunkOffset, - appendWindow, - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos, + chunkOffset, + protectionDataUpdate: false, + appendWindow } }; } diff --git a/src/transports/local/text_parser.ts b/src/transports/local/text_parser.ts index 558810ca4c..57a26fdb02 100644 --- a/src/transports/local/text_parser.ts +++ b/src/transports/local/text_parser.ts @@ -14,10 +14,6 @@ * limitations under the License. */ -import { - Observable, - of as observableOf, -} from "rxjs"; import { getMDHDTimescale } from "../../parsers/containers/isobmff"; import { strToUtf8, @@ -48,8 +44,8 @@ function parseISOBMFFEmbeddedTextTrack( initTimescale } : ISegmentParserArguments< Uint8Array | ArrayBuffer | string > -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, segment } = content; const { data, isChunked } = response; @@ -59,10 +55,10 @@ function parseISOBMFFEmbeddedTextTrack( new Uint8Array(data); if (segment.isInit) { const mdhdTimescale = getMDHDTimescale(chunkBytes); - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - initTimescale: mdhdTimescale, - protectionDataUpdate: false } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + initTimescale: mdhdTimescale, + protectionDataUpdate: false } }; } const chunkInfos = getISOBMFFTimingInfos(chunkBytes, isChunked, @@ -73,12 +69,12 @@ function parseISOBMFFEmbeddedTextTrack( chunkInfos, isChunked); const chunkOffset = takeFirstSet(segment.timestampOffset, 0); - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos, - chunkOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos, + chunkOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } /** @@ -91,15 +87,15 @@ function parsePlainTextTrack( content } : ISegmentParserArguments< Uint8Array | ArrayBuffer | string > -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, segment } = content; if (segment.isInit) { - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - initTimescale: undefined, - protectionDataUpdate: false } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + initTimescale: undefined, + protectionDataUpdate: false } }; } const { data, isChunked } = response; @@ -113,12 +109,12 @@ function parsePlainTextTrack( } const chunkData = getPlainTextTrackData(content, textTrackData, isChunked); const chunkOffset = takeFirstSet(segment.timestampOffset, 0); - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos: null, - chunkOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos: null, + chunkOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } /** @@ -133,25 +129,25 @@ export default function textTrackParser( ArrayBuffer | string | null > -) : Observable | - ISegmentParserSegment> +) : ISegmentParserInitSegment | + ISegmentParserSegment { const { period, adaptation, representation, segment } = content; const { data, isChunked } = response; if (data === null) { // No data, just return empty infos if (segment.isInit) { - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } const chunkOffset = takeFirstSet(segment.timestampOffset, 0); - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: null, - chunkOffset, - appendWindow: [period.start, period.end], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: null, + chunkOffset, + protectionDataUpdate: false, + appendWindow: [period.start, period.end] } }; } const containerType = inferSegmentContainer(adaptation.type, representation); diff --git a/src/transports/metaplaylist/pipelines.ts b/src/transports/metaplaylist/pipelines.ts index a1abbe53b5..06f31b689d 100644 --- a/src/transports/metaplaylist/pipelines.ts +++ b/src/transports/metaplaylist/pipelines.ts @@ -51,8 +51,8 @@ import { ISegmentLoaderArguments, ISegmentParserArguments, ISegmentParserInitSegment, - ISegmentParserParsedSegment, ISegmentParserSegment, + ISegmentParserSegmentPayload, ITextTrackSegmentData, ITransportOptions, ITransportPipelines, @@ -262,7 +262,7 @@ export default function(options : ITransportOptions): ITransportPipelines { function offsetTimeInfos( contentOffset : number, contentEnd : number | undefined, - segmentResponse : ISegmentParserParsedSegment + segmentResponse : ISegmentParserSegmentPayload ) : { chunkInfos : IChunkTimeInfo | null; chunkOffset : number; appendWindow : [ number | undefined, number | undefined ]; } { @@ -306,24 +306,22 @@ export default function(options : ITransportOptions): ITransportPipelines { parser( args : ISegmentParserArguments - ) : Observable | - ISegmentParserInitSegment> + ) : ISegmentParserSegment | + ISegmentParserInitSegment { const { content } = args; const { segment } = content; const { contentStart, contentEnd } = getMetaPlaylistPrivateInfos(segment); const { audio } = getTransportPipelinesFromSegment(segment); + const parsed = audio.parser(getParserArguments(args, segment)); + if (parsed.type === "parsed-init-segment") { + return parsed; + } + const timeInfos = offsetTimeInfos(contentStart, contentEnd, parsed.value); + // TODO check why this is unsafe for TypeScript // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return audio.parser(getParserArguments(args, segment)) - .pipe(map(res => { - if (res.type === "parsed-init-segment") { - return res; - } - const timeInfos = offsetTimeInfos(contentStart, contentEnd, res.value); - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return objectAssign({ type: "parsed-segment", - value: objectAssign({}, res.value, timeInfos) }); - })); + return objectAssign({ type: "parsed-segment", + value: objectAssign({}, parsed.value, timeInfos) }); }, }; @@ -335,24 +333,22 @@ export default function(options : ITransportOptions): ITransportPipelines { parser( args : ISegmentParserArguments - ) : Observable | - ISegmentParserInitSegment> + ) : ISegmentParserSegment | + ISegmentParserInitSegment { const { content } = args; const { segment } = content; const { contentStart, contentEnd } = getMetaPlaylistPrivateInfos(segment); const { video } = getTransportPipelinesFromSegment(segment); + const parsed = video.parser(getParserArguments(args, segment)); + if (parsed.type === "parsed-init-segment") { + return parsed; + } + const timeInfos = offsetTimeInfos(contentStart, contentEnd, parsed.value); + // TODO check why this is unsafe for TypeScript // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return video.parser(getParserArguments(args, segment)) - .pipe(map(res => { - if (res.type === "parsed-init-segment") { - return res; - } - const timeInfos = offsetTimeInfos(contentStart, contentEnd, res.value); - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return objectAssign({ type: "parsed-segment", - value: objectAssign({}, res.value, timeInfos) }); - })); + return objectAssign({ type: "parsed-segment", + value: objectAssign({}, parsed.value, timeInfos) }); }, }; @@ -364,24 +360,22 @@ export default function(options : ITransportOptions): ITransportPipelines { parser( args: ISegmentParserArguments - ) : Observable | - ISegmentParserSegment> + ) : ISegmentParserInitSegment | + ISegmentParserSegment { const { content } = args; const { segment } = content; const { contentStart, contentEnd } = getMetaPlaylistPrivateInfos(segment); const { text } = getTransportPipelinesFromSegment(segment); + const parsed = text.parser(getParserArguments(args, segment)); + if (parsed.type === "parsed-init-segment") { + return parsed; + } + const timeInfos = offsetTimeInfos(contentStart, contentEnd, parsed.value); + // TODO check why this is unsafe for TypeScript // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return text.parser(getParserArguments(args, segment)) - .pipe(map(res => { - if (res.type === "parsed-init-segment") { - return res; - } - const timeInfos = offsetTimeInfos(contentStart, contentEnd, res.value); - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return objectAssign({ type: "parsed-segment", - value: objectAssign({}, res.value, timeInfos) }); - })); + return objectAssign({ type: "parsed-segment", + value: objectAssign({}, parsed.value, timeInfos) }); }, }; @@ -393,24 +387,23 @@ export default function(options : ITransportOptions): ITransportPipelines { parser( args : ISegmentParserArguments - ) : Observable | - ISegmentParserSegment> + ) : ISegmentParserInitSegment | + ISegmentParserSegment { const { content } = args; const { segment } = content; const { contentStart, contentEnd } = getMetaPlaylistPrivateInfos(segment); const { image } = getTransportPipelinesFromSegment(segment); // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return image.parser(getParserArguments(args, segment)) - .pipe(map(res => { - if (res.type === "parsed-init-segment") { - return res; - } - const timeInfos = offsetTimeInfos(contentStart, contentEnd, res.value); - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return objectAssign({ type: "parsed-segment", - value: objectAssign({}, res.value, timeInfos) }); - })); + const parsed = image.parser(getParserArguments(args, segment)); + if (parsed.type === "parsed-init-segment") { + return parsed; + } + const timeInfos = offsetTimeInfos(contentStart, contentEnd, parsed.value); + // TODO check why this is unsafe for TypeScript + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return objectAssign({ type: "parsed-segment", + value: objectAssign({}, parsed.value, timeInfos) }); }, }; diff --git a/src/transports/smooth/pipelines.ts b/src/transports/smooth/pipelines.ts index 0ef68db3de..f90de518a7 100644 --- a/src/transports/smooth/pipelines.ts +++ b/src/transports/smooth/pipelines.ts @@ -180,24 +180,24 @@ export default function(options : ITransportOptions) : ITransportPipelines { response, initTimescale, } : ISegmentParserArguments< ArrayBuffer | Uint8Array | null > - ) : Observable | - ISegmentParserSegment> + ) : ISegmentParserInitSegment | + ISegmentParserSegment { const { segment, adaptation, manifest } = content; const { data, isChunked } = response; if (data === null) { if (segment.isInit) { - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: null, - chunkOffset: 0, - appendWindow: [undefined, undefined], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: null, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow: [undefined, undefined] } }; } const responseBuffer = data instanceof Uint8Array ? data : @@ -205,12 +205,12 @@ export default function(options : ITransportOptions) : ITransportPipelines { if (segment.isInit) { const timescale = segment.privateInfos?.smoothInitSegment?.timescale; - return observableOf({ type: "parsed-init-segment", - value: { initializationData: data, - // smooth init segments are crafted by hand. - // Their timescale is the one from the manifest. - initTimescale: timescale, - protectionDataUpdate: false } }); + return { type: "parsed-init-segment", + value: { initializationData: data, + // smooth init segments are crafted by hand. + // Their timescale is the one from the manifest. + initTimescale: timescale, + protectionDataUpdate: false } }; } const timingInfos = initTimescale !== undefined ? @@ -231,12 +231,12 @@ export default function(options : ITransportOptions) : ITransportPipelines { if (nextSegments.length > 0) { addNextSegments(adaptation, nextSegments, segment); } - return observableOf({ type: "parsed-segment", - value: { chunkData, - chunkInfos, - chunkOffset: 0, - appendWindow: [undefined, undefined], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData, + chunkInfos, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow: [undefined, undefined] } }; }, }; @@ -272,8 +272,8 @@ export default function(options : ITransportOptions) : ITransportPipelines { response, initTimescale, } : ISegmentParserArguments - ) : Observable | - ISegmentParserSegment> + ) : ISegmentParserInitSegment | + ISegmentParserSegment { const { manifest, adaptation, representation, segment } = content; const { language } = adaptation; @@ -281,18 +281,18 @@ export default function(options : ITransportOptions) : ITransportPipelines { const { mimeType = "", codec = "" } = representation; const { data, isChunked } = response; if (segment.isInit) { // text init segment has no use in HSS - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } if (data === null) { - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: null, - chunkOffset: 0, - appendWindow: [undefined, undefined], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: null, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow: [undefined, undefined] } }; } let nextSegments; @@ -395,16 +395,16 @@ export default function(options : ITransportOptions) : ITransportPipelines { } const chunkOffset = segmentStart ?? 0; - return observableOf({ type: "parsed-segment", - value: { chunkData: { type: _sdType, - data: _sdData, - start: segmentStart, - end: segmentEnd, - language }, - chunkInfos, - chunkOffset, - appendWindow: [undefined, undefined], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: { type: _sdType, + data: _sdData, + start: segmentStart, + end: segmentEnd, + language }, + chunkInfos, + chunkOffset, + protectionDataUpdate: false, + appendWindow: [undefined, undefined] } }; }, }; @@ -426,16 +426,16 @@ export default function(options : ITransportOptions) : ITransportPipelines { parser( { response, content } : ISegmentParserArguments - ) : Observable | - ISegmentParserSegment> + ) : ISegmentParserInitSegment | + ISegmentParserSegment { const { data, isChunked } = response; if (content.segment.isInit) { // image init segment has no use - return observableOf({ type: "parsed-init-segment", - value: { initializationData: null, - protectionDataUpdate: false, - initTimescale: undefined } }); + return { type: "parsed-init-segment", + value: { initializationData: null, + protectionDataUpdate: false, + initTimescale: undefined } }; } if (isChunked) { @@ -444,28 +444,27 @@ export default function(options : ITransportOptions) : ITransportPipelines { // TODO image Parsing should be more on the buffer side, no? if (data === null || features.imageParser === null) { - return observableOf({ type: "parsed-segment", - value: { chunkData: null, - chunkInfos: null, - chunkOffset: 0, - appendWindow: [undefined, undefined], - protectionDataUpdate: false } }); + return { type: "parsed-segment", + value: { chunkData: null, + chunkInfos: null, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow: [undefined, undefined] } }; } const bifObject = features.imageParser(new Uint8Array(data)); const thumbsData = bifObject.thumbs; - return observableOf({ type: "parsed-segment", - value: { chunkData: { data: thumbsData, - start: 0, - end: Number.MAX_VALUE, - timescale: 1, - type: "bif" }, - chunkInfos: { time: 0, - duration: Number.MAX_VALUE, - timescale: bifObject.timescale }, - chunkOffset: 0, - protectionDataUpdate: false, - appendWindow: [undefined, undefined] } }); + return { type: "parsed-segment", + value: { chunkData: { data: thumbsData, + start: 0, + end: Number.MAX_VALUE, + timescale: 1, + type: "bif" }, + chunkInfos: { time: 0, + duration: Number.MAX_VALUE }, + chunkOffset: 0, + protectionDataUpdate: false, + appendWindow: [undefined, undefined] } }; }, }; diff --git a/src/transports/types.ts b/src/transports/types.ts index 73b1b0d7bf..8c2145c8ab 100644 --- a/src/transports/types.ts +++ b/src/transports/types.ts @@ -142,8 +142,8 @@ export type ISegmentParser< ParsedMediaDataFormat > = ( x : ISegmentParserArguments< LoadedFormat > -) => Observable | - ISegmentParserSegment>; +) => ISegmentParserInitSegment | + ISegmentParserSegment; /** Arguments for the loader of the manifest pipeline. */ export interface IManifestLoaderArguments { @@ -427,7 +427,7 @@ export interface IChunkTimeInfo { } /** Payload sent when an initialization segment has been parsed. */ -export interface ISegmentParserParsedInitSegment { +export interface ISegmentParserInitSegmentPayload { /** * Initialization segment that can be directly pushed to the corresponding * buffer. @@ -453,7 +453,7 @@ export interface ISegmentParserParsedInitSegment { } /** Payload sent when an media segment has been parsed. */ -export interface ISegmentParserParsedSegment { +export interface ISegmentParserSegmentPayload { /** Data to decode. */ chunkData : T | null; /** Time information about the segment. */ @@ -504,7 +504,7 @@ export interface ISegmentParserParsedSegment { */ export interface ISegmentParserInitSegment { type : "parsed-init-segment"; - value : ISegmentParserParsedInitSegment; + value : ISegmentParserInitSegmentPayload; } /** @@ -513,7 +513,7 @@ export interface ISegmentParserInitSegment { */ export interface ISegmentParserSegment { type : "parsed-segment"; - value : ISegmentParserParsedSegment; + value : ISegmentParserSegmentPayload; } // format under which audio / video data / initialization data is decodable