Skip to content

Commit

Permalink
transports: segment parser now return result synchronously
Browse files Browse the repository at this point in the history
Every segment parser in `src/transports` (which allows to parse things
like data, encryption metadata and time information from loaded segments
according to the current streaming protocol) do so synchronously.

However, those are typed as Observables. My guess (this was before my
time on the project) is that it was mainly for two reasons:

  1. Manifest parsing can be asynchronous (for example to fetch MPD
     xlinks or the clock) and it follows the same pipeline architecture
     (a loader and a parser) than segments.

     Letting segment parser be asynchronous thus allows to harmonize all
     resource-fetching code to a similar interface.

  2. Theorically, segment parsing could be asynchronous. As far as I can
     see for now, this could be because of two reasons:

       - We need to perform an HTTPS request to fully parse segment
         information.

         We already encountered such case when trying to implement the
         "sidx ref type 1" feature (#754). However we did not merge that
         work for now due to the complexity and most of all because no
         real DASH MPD seems to rely on this feature (nor ar we able to
         envisage a real use-case for it).

       - segment parsing is performed in a Worker.

         But We do not feel the need to do that for now as the CPU
         footprint of parsing segments is really low.

So theorically, there's good reasons to make the parsing operation
asynchronous, but reastically for now, there's none.

Meanwhile, making that operation asynchronous lead to big headaches:

  - it is one of the main reasons why the PR on making the init and
    first media segment's requests in parallel [#918] was not merged.
    Subtle bugs, some that would only be seen in a case where the
    parsing is asynchronous (so not really existant for now) could arise
    because of that.

  - Even if the response of the parser is wrapped in an Observable, it
    wasn't lazy (the code would be called even if the parser was not
    subscribed to). We could fix that by wrapping all parsers in a
    `defer` call from RxJS, but it would complexify even more the code.

  - Handling Observables is a LOT harder than just using directly a
    response in general. There's pending work about not depending so
    much on RxJS in the future [#916] and I think beginning to remove
    it where it is not even needed is a good first step.

If it become needed again in the future (e.g. to support the "sidx ref
type 1" feature), we could always revert that work, maybe even with a
more Promise-based solution which might be much easier to reason about.

Thoughts?
  • Loading branch information
peaBerberian committed Nov 24, 2021
1 parent 967df26 commit 7dfd996
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 323 deletions.
44 changes: 27 additions & 17 deletions src/core/fetchers/segment/segment_fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
Subject,
} from "rxjs";
import {
catchError,
filter,
finalize,
mergeMap,
Expand Down Expand Up @@ -67,9 +66,19 @@ export type ISegmentFetcherWarning = ISegmentLoaderWarning;
*/
export interface ISegmentFetcherChunkEvent<T> {
type : "chunk";
/** Parse the downloaded chunk. */
parse : (initTimescale? : number) => Observable<ISegmentParserInitSegment<T> |
ISegmentParserSegment<T>>;
/**
* Parse the downloaded chunk.
*
* Take in argument the timescale value that might have been obtained by
* parsing an initialization segment from the same Representation.
* Can be left to `undefined` if unknown or inexistant, segment parsers should
* be resilient and still work without that information.
*
* @param {number} initTimescale
* @returns {Object}
*/
parse(initTimescale? : number) : ISegmentParserInitSegment<T> |
ISegmentParserSegment<T>;
}

/**
Expand Down Expand Up @@ -208,20 +217,21 @@ export default function createSegmentFetcher<T>(
* @param {Object} [initTimescale]
* @returns {Observable}
*/
parse(initTimescale? : number) : Observable<ISegmentParserInitSegment<T> |
ISegmentParserSegment<T>> {
parse(initTimescale? : number) : ISegmentParserInitSegment<T> |
ISegmentParserSegment<T> {
const response = { data: evt.value.responseData, isChunked };
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-return */
return segmentParser({ response, initTimescale, content })
/* eslint-enable @typescript-eslint/no-unsafe-call */
/* eslint-enable @typescript-eslint/no-unsafe-member-access */
/* eslint-enable @typescript-eslint/no-unsafe-return */
.pipe(catchError((error: unknown) => {
throw formatError(error, { defaultCode: "PIPELINE_PARSE_ERROR",
defaultReason: "Unknown parsing error" });
}));
try {
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-return */
return segmentParser({ response, initTimescale, content });
/* eslint-enable @typescript-eslint/no-unsafe-call */
/* eslint-enable @typescript-eslint/no-unsafe-member-access */
/* eslint-enable @typescript-eslint/no-unsafe-return */
} catch (error : unknown) {
throw formatError(error, { defaultCode: "PIPELINE_PARSE_ERROR",
defaultReason: "Unknown parsing error" });
}
},
};

Expand Down
4 changes: 2 additions & 2 deletions src/core/stream/representation/push_media_segment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import Manifest, {
Period,
Representation,
} from "../../../manifest";
import { ISegmentParserParsedSegment } from "../../../transports";
import { ISegmentParserSegmentPayload } from "../../../transports";
import objectAssign from "../../../utils/object_assign";
import { SegmentBuffer } from "../../segment_buffers";
import EVENTS from "../events_generators";
Expand Down Expand Up @@ -56,7 +56,7 @@ export default function pushMediaSegment<T>(
period : Period;
representation : Representation; };
initSegmentData : T | null;
parsedSegment : ISegmentParserParsedSegment<T>;
parsedSegment : ISegmentParserSegmentPayload<T>;
segment : ISegment;
segmentBuffer : SegmentBuffer<T>; }
) : Observable< IStreamEventAddedSegment<T> > {
Expand Down
10 changes: 4 additions & 6 deletions src/core/stream/representation/representation_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {
import {
finalize,
ignoreElements,
map,
mergeMap,
share,
startWith,
Expand All @@ -57,7 +56,7 @@ import Manifest, {
} from "../../../manifest";
import {
ISegmentParserInitSegment,
ISegmentParserParsedInitSegment,
ISegmentParserInitSegmentPayload,
ISegmentParserSegment,
} from "../../../transports";
import assertUnreachable from "../../../utils/assert_unreachable";
Expand Down Expand Up @@ -254,7 +253,7 @@ export default function RepresentationStream<T>({
* Saved initialization segment state for this representation.
* `null` if the initialization segment hasn't been loaded yet.
*/
let initSegmentObject : ISegmentParserParsedInitSegment<T> | null =
let initSegmentObject : ISegmentParserInitSegmentPayload<T> | null =
initSegment === null ? { initializationData: null,
protectionDataUpdate: false,
initTimescale: undefined } :
Expand Down Expand Up @@ -461,9 +460,8 @@ export default function RepresentationStream<T>({

case "chunk":
const initTimescale = initSegmentObject?.initTimescale;
return evt.parse(initTimescale).pipe(map(parserResponse => {
return objectAssign({ segment }, parserResponse);
}));
const parsed = evt.parse(initTimescale);
return observableOf(objectAssign({ segment }, parsed));

case "ended":
return requestNextSegment$;
Expand Down
12 changes: 8 additions & 4 deletions src/transports/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,16 @@ Its concept can be illustrated as such:
+--------+
```

The parser returns an Observable which will emit the parsed resource when done.
Depending on the type of parser (e.g. Manifest parser or segment parser), that
task can be synchronous or asynchronous.

This Observable will throw if the resource is corrupted or miss crucial
information.
In asynchronous cases, the parser will return an Observable emitting a unique
time the result when done and throwing if an error is encountered.

[1] the parser could also need to perform requests (e.g. it needs to fetch the
In synchronous cases, the parser returns directly the result, and can throw
directly when/if an error is encountered.

[1] a parser could also need to perform requests (e.g. it needs to fetch the
current time from a server).
In such cases, the parser is given a special callback, which allows it to
receive the same error-handling perks than a loader, such as multiple retries,
Expand Down
49 changes: 24 additions & 25 deletions src/transports/dash/image_pipelines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ export function imageLoader(
export function imageParser(
{ response,
content } : ISegmentParserArguments<Uint8Array|ArrayBuffer|null>
) : Observable<ISegmentParserInitSegment<null> |
ISegmentParserSegment<IImageTrackSegmentData>>
) : ISegmentParserInitSegment<null> |
ISegmentParserSegment<IImageTrackSegmentData>
{
const { segment, period } = content;
const { data, isChunked } = response;

if (content.segment.isInit) { // image init segment has no use
return observableOf({ type: "parsed-init-segment",
value: { initializationData: null,
protectionDataUpdate: false,
initTimescale: undefined } });
return { type: "parsed-init-segment",
value: { initializationData: null,
protectionDataUpdate: false,
initTimescale: undefined } };
}

if (isChunked) {
Expand All @@ -76,27 +76,26 @@ export function imageParser(

// TODO image Parsing should be more on the buffer side, no?
if (data === null || features.imageParser === null) {
return observableOf({ type: "parsed-segment",
value: { chunkData: null,
chunkInfos: { duration: segment.duration,
time: segment.time },
chunkOffset,
appendWindow: [period.start, period.end],
protectionDataUpdate: false } });
return { type: "parsed-segment",
value: { chunkData: null,
chunkInfos: { duration: segment.duration,
time: segment.time },
chunkOffset,
appendWindow: [period.start, period.end],
protectionDataUpdate: false } };
}

const bifObject = features.imageParser(new Uint8Array(data));
const thumbsData = bifObject.thumbs;
return observableOf({ type: "parsed-segment",
value: { chunkData: { data: thumbsData,
start: 0,
end: Number.MAX_VALUE,
timescale: 1,
type: "bif" },
chunkInfos: { time: 0,
duration: Number.MAX_VALUE,
timescale: bifObject.timescale },
chunkOffset,
appendWindow: [period.start, period.end],
protectionDataUpdate: false } });
return { type: "parsed-segment",
value: { chunkData: { data: thumbsData,
start: 0,
end: Number.MAX_VALUE,
timescale: 1,
type: "bif" },
chunkInfos: { time: 0,
duration: Number.MAX_VALUE },
chunkOffset,
protectionDataUpdate: false,
appendWindow: [period.start, period.end] } };
}
64 changes: 30 additions & 34 deletions src/transports/dash/segment_parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
* limitations under the License.
*/

import {
Observable,
of as observableOf,
} from "rxjs";
import {
getMDHDTimescale,
getSegmentsFromSidx,
Expand Down Expand Up @@ -53,25 +49,25 @@ export default function generateAudioVideoSegmentParser(
initTimescale } : ISegmentParserArguments< Uint8Array |
ArrayBuffer |
null >
) : Observable<ISegmentParserInitSegment<Uint8Array | ArrayBuffer | null> |
ISegmentParserSegment< Uint8Array | ArrayBuffer | null>> {
) : ISegmentParserInitSegment<Uint8Array | ArrayBuffer | null> |
ISegmentParserSegment< Uint8Array | ArrayBuffer | null> {
const { period, adaptation, representation, segment, manifest } = content;
const { data, isChunked } = response;
const appendWindow : [number, number | undefined] = [ period.start, period.end ];

if (data === null) {
if (segment.isInit) {
return observableOf({ type: "parsed-init-segment" as const,
value: { initializationData: null,
protectionDataUpdate: false,
initTimescale: undefined } });
return { type: "parsed-init-segment" as const,
value: { initializationData: null,
protectionDataUpdate: false,
initTimescale: undefined } };
}
return observableOf({ type: "parsed-segment" as const,
value: { chunkData: null,
chunkInfos: null,
chunkOffset: 0,
appendWindow,
protectionDataUpdate: false } });
return { type: "parsed-segment" as const,
value: { chunkData: null,
chunkInfos: null,
chunkOffset: 0,
protectionDataUpdate: false,
appendWindow } };
}

const chunkData = data instanceof Uint8Array ? data :
Expand Down Expand Up @@ -112,24 +108,24 @@ export default function generateAudioVideoSegmentParser(
manifest.publishTime);
if (events !== undefined) {
const { needsManifestRefresh, inbandEvents } = events;
return observableOf({ type: "parsed-segment",
value: { chunkData,
chunkInfos,
chunkOffset,
appendWindow,
inbandEvents,
needsManifestRefresh,
protectionDataUpdate } });
return { type: "parsed-segment",
value: { chunkData,
chunkInfos,
chunkOffset,
appendWindow,
inbandEvents,
protectionDataUpdate,
needsManifestRefresh } };
}
}
}

return observableOf({ type: "parsed-segment",
value: { chunkData,
chunkInfos,
chunkOffset,
appendWindow,
protectionDataUpdate } });
return { type: "parsed-segment",
value: { chunkData,
chunkInfos,
chunkOffset,
protectionDataUpdate,
appendWindow } };
}
// we're handling an initialization segment
const { indexRange } = segment;
Expand Down Expand Up @@ -174,9 +170,9 @@ export default function generateAudioVideoSegmentParser(
const parsedTimescale = isNullOrUndefined(timescale) ? undefined :
timescale;

return observableOf({ type: "parsed-init-segment",
value: { initializationData: chunkData,
protectionDataUpdate,
initTimescale: parsedTimescale } });
return { type: "parsed-init-segment",
value: { initializationData: chunkData,
protectionDataUpdate,
initTimescale: parsedTimescale } };
};
}
Loading

0 comments on commit 7dfd996

Please sign in to comment.