Skip to content

Commit

Permalink
Merge pull request #1127 from canalplus/misc/segment_buffer-no-rxjs
Browse files Browse the repository at this point in the history
Remove RxJS from the SegmentBuffer and related code
  • Loading branch information
peaBerberian committed Sep 9, 2022
2 parents febb3df + c581e7a commit 296f118
Show file tree
Hide file tree
Showing 20 changed files with 671 additions and 512 deletions.
87 changes: 86 additions & 1 deletion src/compat/event_listeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,69 @@ export type IEventTargetLike = HTMLElement |
IEventEmitterLike |
IEventEmitter<unknown>;

/**
* Returns a function allowing to add event listeners for particular event(s)
* optionally automatically adding browser prefixes if needed.
* @param {Array.<string>} eventNames - The event(s) to listen to. If multiple
* events are set, the event listener will be triggered when any of them emits.
* @returns {Function} - Returns function allowing to easily add a callback to
* be triggered when that event is emitted on a given event target.
*/
function createCompatibleEventListener(
eventNames : string[]
) :
(
element : IEventTargetLike,
listener : (event? : unknown) => void,
cancelSignal: CancellationSignal
) => void
{
let mem : string|undefined;
const prefixedEvents = eventPrefixed(eventNames);

return (
element : IEventTargetLike,
listener: (event? : unknown) => void,
cancelSignal: CancellationSignal
) => {
if (cancelSignal.isCancelled) {
return;
}

// if the element is a HTMLElement we can detect
// the supported event, and memoize it in `mem`
if (element instanceof HTMLElement) {
if (typeof mem === "undefined") {
mem = findSupportedEvent(element, prefixedEvents);
}

if (isNonEmptyString(mem)) {
element.addEventListener(mem, listener);
cancelSignal.register(() => {
if (mem !== undefined) {
element.removeEventListener(mem, listener);
}
});
} else {
if (__ENVIRONMENT__.CURRENT_ENV === __ENVIRONMENT__.DEV as number) {
log.warn(`compat: element ${element.tagName}` +
" does not support any of these events: " +
prefixedEvents.join(", "));
}
return ;
}
}

prefixedEvents.forEach(eventName => {
(element as IEventEmitterLike).addEventListener(eventName, listener);
cancelSignal.register(() => {
(element as IEventEmitterLike).removeEventListener(eventName, listener);
});
});
};

}

/**
* @param {Array.<string>} eventNames
* @param {Array.<string>|undefined} prefixes
Expand Down Expand Up @@ -247,7 +310,8 @@ export interface IPictureInPictureEvent {

/**
* Emit when video enters and leaves Picture-In-Picture mode.
* @param {HTMLMediaElement} mediaElement
* @param {HTMLMediaElement} elt
* @param {Object} stopListening
* @returns {Observable}
*/
function getPictureOnPictureStateRef(
Expand Down Expand Up @@ -505,6 +569,24 @@ const onKeyError$ = compatibleListener(["keyerror", "error"]);
*/
const onKeyStatusesChange$ = compatibleListener(["keystatuseschange"]);

/**
* @param {HTMLMediaElement} mediaElement
* @returns {Observable}
*/
const onSeeking = createCompatibleEventListener(["seeking"]);

/**
* @param {HTMLMediaElement} mediaElement
* @returns {Observable}
*/
const onSeeked = createCompatibleEventListener(["seeked"]);

/**
* @param {HTMLMediaElement} mediaElement
* @returns {Observable}
*/
const onEnded = createCompatibleEventListener(["ended"]);

/**
* Utilitary function allowing to add an event listener and remove it
* automatically once the given `CancellationSignal` emits.
Expand Down Expand Up @@ -534,6 +616,7 @@ export {
getVideoVisibilityRef,
getVideoWidthRef,
onEncrypted$,
onEnded,
onEnded$,
onFullscreenChange$,
onKeyAdded$,
Expand All @@ -542,7 +625,9 @@ export {
onKeyStatusesChange$,
onLoadedMetadata$,
onRemoveSourceBuffers$,
onSeeked,
onSeeked$,
onSeeking,
onSeeking$,
onSourceClose$,
onSourceEnded$,
Expand Down
97 changes: 48 additions & 49 deletions src/compat/on_height_width_change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@
* limitations under the License.
*/

import {
defer as observableDefer,
distinctUntilChanged,
interval as observableInterval,
map,
Observable,
Observer,
startWith,
} from "rxjs";
import log from "../log";
import createSharedReference, {
IReadOnlySharedReference,
} from "../utils/reference";
import { CancellationSignal } from "../utils/task_canceller";
import isNode from "./is_node";

export interface IResolution { width : number;
Expand Down Expand Up @@ -63,8 +58,8 @@ const _ResizeObserver : IResizeObserverConstructor |
/* eslint-enable @typescript-eslint/no-unsafe-assignment */

/**
* Emit the current height and width of the given `element` on subscribtion
* and each time it changes.
* Emit the current height and width of the given `element` each time it
* changes.
*
* On some browsers, we might not be able to rely on a native API to know when
* it changes, the `interval` argument allow us to provide us an inverval in
Expand All @@ -75,46 +70,50 @@ const _ResizeObserver : IResizeObserverConstructor |
*/
export default function onHeightWidthChange(
element : HTMLElement,
interval : number
) : Observable<IResolution> {
return observableDefer(() : Observable<IResolution> => {
if (_ResizeObserver !== undefined) {
let lastHeight : number = -1;
let lastWidth : number = -1;

return new Observable((obs : Observer<IResolution>) => {
const resizeObserver = new _ResizeObserver(entries => {
if (entries.length === 0) {
log.error("Compat: Resized but no observed element.");
return;
}
interval : number,
cancellationSignal : CancellationSignal
) : IReadOnlySharedReference<IResolution> {
const { height: initHeight, width: initWidth } = element.getBoundingClientRect();
const ref = createSharedReference<IResolution>({
height: initHeight,
width: initWidth,
});
let lastHeight : number = initHeight;
let lastWidth : number = initWidth;

const entry = entries[0];
const { height, width } = entry.contentRect;
if (_ResizeObserver !== undefined) {
const resizeObserver = new _ResizeObserver(entries => {
if (entries.length === 0) {
log.error("Compat: Resized but no observed element.");
return;
}

if (height !== lastHeight || width !== lastWidth) {
lastHeight = height;
lastWidth = width;
obs.next({ height, width });
}
});
const entry = entries[0];
const { height, width } = entry.contentRect;

resizeObserver.observe(element);
return () => {
resizeObserver.disconnect();
};
});
}
if (height !== lastHeight || width !== lastWidth) {
lastHeight = height;
lastWidth = width;
ref.setValue({ height, width });
}
});

return observableInterval(interval).pipe(
startWith(null),
map(() : IResolution => {
const { height, width } = element.getBoundingClientRect();
return { height, width };
}),
distinctUntilChanged((o, n) => {
return o.height === n.height && o.width === n.width;
})
);
});
resizeObserver.observe(element);
cancellationSignal.register(() => {
resizeObserver.disconnect();
});
} else {
const intervalId = setInterval(() => {
const { height, width } = element.getBoundingClientRect();
if (height !== lastHeight || width !== lastWidth) {
lastHeight = height;
lastWidth = width;
ref.setValue({ height, width });
}
}, interval);
cancellationSignal.register(() => {
clearInterval(intervalId);
});
}
return ref;
}
102 changes: 55 additions & 47 deletions src/core/segment_buffers/garbage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,66 @@
* limitations under the License.
*/

import {
combineLatest as observableCombineLatest,
concatAll,
EMPTY,
from as observableFrom,
ignoreElements,
mergeMap,
Observable,
of as observableOf,
} from "rxjs";
import log from "../../log";
import { getInnerAndOuterTimeRanges } from "../../utils/ranges";
import { IReadOnlySharedReference } from "../../utils/reference";
import { CancellationSignal } from "../../utils/task_canceller";
import { IReadOnlyPlaybackObserver } from "../api";
import { IStreamOrchestratorPlaybackObservation } from "../stream";
import { SegmentBuffer } from "./implementations";

export interface IGarbageCollectorArgument {
/** SegmentBuffer implementation */
segmentBuffer : SegmentBuffer;
/** Emit current position in seconds regularly */
currentTime$ : Observable<number>;
playbackObserver : IReadOnlyPlaybackObserver<
Pick<IStreamOrchestratorPlaybackObservation, "position">
>;
/** Maximum time to keep behind current time position, in seconds */
maxBufferBehind$ : Observable<number>;
maxBufferBehind : IReadOnlySharedReference<number>;
/** Minimum time to keep behind current time position, in seconds */
maxBufferAhead$ : Observable<number>;
maxBufferAhead : IReadOnlySharedReference<number>;
}

/**
* Perform cleaning of the buffer according to the values set by the user
* each time `currentTime$` emits and each times the
* each time `playbackObserver` emits and each times the
* maxBufferBehind/maxBufferAhead values change.
*
* Abort this operation when the `cancellationSignal` emits.
*
* @param {Object} opt
* @param {Object} cancellationSignal
* @returns {Observable}
*/
export default function BufferGarbageCollector({
segmentBuffer,
currentTime$,
maxBufferBehind$,
maxBufferAhead$,
} : IGarbageCollectorArgument) : Observable<never> {
return observableCombineLatest([currentTime$, maxBufferBehind$, maxBufferAhead$]).pipe(
mergeMap(([currentTime, maxBufferBehind, maxBufferAhead]) => {
return clearBuffer(segmentBuffer,
currentTime,
maxBufferBehind,
maxBufferAhead);
}));
export default function BufferGarbageCollector(
{ segmentBuffer,
playbackObserver,
maxBufferBehind,
maxBufferAhead } : IGarbageCollectorArgument,
cancellationSignal : CancellationSignal
) : void {
let lastPosition : number;
playbackObserver.listen((o) => {
lastPosition = o.position.pending ?? o.position.last;
clean();
}, { includeLastObservation: true, clearSignal: cancellationSignal });
function clean() {
clearBuffer(segmentBuffer,
lastPosition,
maxBufferBehind.getValue(),
maxBufferAhead.getValue(),
cancellationSignal)
.catch(e => {
const errMsg = e instanceof Error ?
e.message :
"Unknown error";
log.error("Could not run BufferGarbageCollector:", errMsg);
});
}
maxBufferBehind.onUpdate(clean, { clearSignal: cancellationSignal });
maxBufferAhead.onUpdate(clean, { clearSignal: cancellationSignal });
clean();
}

/**
Expand All @@ -76,16 +90,17 @@ export default function BufferGarbageCollector({
* @param {Number} position - The current position
* @param {Number} maxBufferBehind
* @param {Number} maxBufferAhead
* @returns {Observable}
* @returns {Promise}
*/
function clearBuffer(
async function clearBuffer(
segmentBuffer : SegmentBuffer,
position : number,
maxBufferBehind : number,
maxBufferAhead : number
) : Observable<never> {
maxBufferAhead : number,
cancellationSignal : CancellationSignal
) : Promise<void> {
if (!isFinite(maxBufferBehind) && !isFinite(maxBufferAhead)) {
return EMPTY;
return Promise.resolve();
}

const cleanedupRanges : Array<{ start : number;
Expand Down Expand Up @@ -150,21 +165,14 @@ function clearBuffer(

collectBufferBehind();
collectBufferAhead();
const clean$ = observableFrom(
cleanedupRanges.map((range) => {

for (const range of cleanedupRanges) {
if (range.start < range.end) {
log.debug("GC: cleaning range from SegmentBuffer", range.start, range.end);
if (range.start >= range.end) {
return observableOf(null);
if (cancellationSignal.cancellationError !== null) {
throw cancellationSignal.cancellationError;
}
return segmentBuffer.removeBuffer(range.start, range.end);
})
).pipe(concatAll(),
// NOTE As of now (RxJS 7.4.0), RxJS defines `ignoreElements` default
// first type parameter as `any` instead of the perfectly fine `unknown`,
// leading to linter issues, as it forbids the usage of `any`.
// This is why we're disabling the eslint rule.
/* eslint-disable-next-line @typescript-eslint/no-unsafe-argument */
ignoreElements());

return clean$;
await segmentBuffer.removeBuffer(range.start, range.end, cancellationSignal);
}
}
}
Loading

0 comments on commit 296f118

Please sign in to comment.