Skip to content

Commit

Permalink
Add errors handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Dec 24, 2023
1 parent d665053 commit 4ec4ee2
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 84 deletions.
2 changes: 2 additions & 0 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export class Core<TStream extends Stream = Stream> {
p2pNotReceivingBytesTimeoutMs: 1000,
p2pLoaderDestroyTimeoutMs: 30 * 1000,
httpNotReceivingBytesTimeoutMs: 1000,
maxHttpFailedDownloadAttempts: 3,
maxPeerNotReceivingBytesTimeoutErrors: 3,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private segmentStorage?: SegmentsMemoryStorage;
Expand Down
125 changes: 82 additions & 43 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ export class HybridLoader {
}
} else {
const request = this.requests.getOrCreateRequest(segment);
request.setOrResolveEngineCallbacks(callbacks);
request.setEngineCallbacks(callbacks);
}

this.requestProcessQueueMicrotask();
}

Expand All @@ -125,6 +124,73 @@ export class HybridLoader {
});
};

private processRequests(queueSegmentIds: Set<string>) {
const { stream } = this.lastRequestedSegment;
const { maxHttpFailedDownloadAttempts } = this.settings;
for (const request of this.requests.items()) {
const {
type,
status,
segment,
isCheckedByProcessQueue,
isSegmentRequestedByEngine,
} = request;

if (!type) continue;

switch (status) {
case "loading":
if (
!isSegmentRequestedByEngine &&
!queueSegmentIds.has(segment.localId)
) {
request.abortFromProcessQueue();
this.requests.remove(request);
}
break;

case "succeed":
if (!request.data) break;
if (type === "http") {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
request.resolveEngineCallbacksSuccessfully();
void this.segmentStorage.storeSegment(request.segment, request.data);
this.eventHandlers?.onSegmentLoaded?.(request.data.byteLength, type);
this.requests.remove(request);
break;

case "failed":
if (type === "http" && !isCheckedByProcessQueue) {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
if (
!isSegmentRequestedByEngine &&
!stream.segments.has(request.segment.localId)
) {
this.requests.remove(request);
}
if (
request.failedAttempts.httpAttemptsCount >=
maxHttpFailedDownloadAttempts &&
isSegmentRequestedByEngine
) {
request.resolveEngineCallbacksWithError();
}
break;

case "not-started":
this.requests.remove(request);
break;

case "aborted":
this.requests.remove(request);
break;
}
request.markCheckedByProcessQueue();
}
}

private processQueue() {
const { queue, queueSegmentIds } = QueueUtils.generateQueue({
lastRequestedSegment: this.lastRequestedSegment,
Expand All @@ -137,55 +203,28 @@ export class HybridLoader {
);
},
});
this.processRequests(queueSegmentIds);

for (const request of this.requests.items()) {
if (request.status === "loading") {
if (
!request.isSegmentRequestedByEngine &&
!queueSegmentIds.has(request.segment.localId)
) {
request.abortFromProcessQueue();
this.requests.remove(request);
}
continue;
}

if (request.status === "succeed") {
const { type, data } = request;
if (!type || !data) continue;
if (type === "http") {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
void this.segmentStorage.storeSegment(request.segment, data);
this.eventHandlers?.onSegmentLoaded?.(data.byteLength, type);
this.requests.remove(request);
continue;
}

if (request.status === "failed") {
if (request.type === "http") {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
continue;
}

if (
(request.status === "not-started" || request.status === "aborted") &&
!request.isSegmentRequestedByEngine
) {
this.requests.remove(request);
}
}

const { simultaneousHttpDownloads, simultaneousP2PDownloads } =
this.settings;
const {
simultaneousHttpDownloads,
simultaneousP2PDownloads,
maxHttpFailedDownloadAttempts,
} = this.settings;

for (const item of queue) {
const { statuses, segment } = item;
const request = this.requests.get(segment);

if (statuses.isHighDemand) {
if (request?.type === "http" && request.status === "loading") continue;
if (
request?.type === "http" &&
request.status === "failed" &&
request.failedAttempts.httpAttemptsCount >=
maxHttpFailedDownloadAttempts
) {
break;
}

if (this.requests.executingHttpCount < simultaneousHttpDownloads) {
void this.loadThroughHttp(segment);
Expand Down
6 changes: 4 additions & 2 deletions packages/p2p-media-loader-core/src/p2p/peer-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { Settings } from "../types";

export type PeerSettings = Pick<
Settings,
"p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize"
| "p2pNotReceivingBytesTimeoutMs"
| "webRtcMaxMessageSize"
| "maxPeerNotReceivingBytesTimeoutErrors"
>;

export class PeerInterface {
Expand Down Expand Up @@ -36,6 +38,7 @@ export class PeerInterface {

private onPeerClosed = () => {
this.destroy();
this.eventHandlers.onDestroy();
};

private onConnectionError = (error: { code: string }) => {
Expand Down Expand Up @@ -121,7 +124,6 @@ export class PeerInterface {

destroy() {
this.connection.destroy();
this.eventHandlers.onDestroy();
}
}

Expand Down
32 changes: 22 additions & 10 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
RequestControls,
RequestError,
PeerRequestErrorType,
RequestInnerErrorType,
} from "../request";
import * as Command from "./commands";
import { Segment } from "../types";
Expand All @@ -31,6 +32,9 @@ export class Peer {
};
private loadedSegments = new Set<number>();
private httpLoadingSegments = new Set<number>();
private downloadingErrors: RequestError<
PeerRequestErrorType | RequestInnerErrorType
>[] = [];
private logger = debug("core:peer");

constructor(
Expand Down Expand Up @@ -106,6 +110,7 @@ export class Peer {
controls.completeOnSuccess();
this.downloadingContext = undefined;
} else if (request.loadedBytes > request.totalBytes) {
request.clearLoadedBytes();
this.cancelSegmentDownloading("peer-response-bytes-mismatch");
}
};
Expand All @@ -120,9 +125,23 @@ export class Peer {
controls: segmentRequest.start(
{ type: "p2p", peerId: this.id },
{
abort: this.abortSegmentDownloading,
notReceivingBytesTimeoutMs:
this.settings.p2pNotReceivingBytesTimeoutMs,
abort: (error) => {
if (!this.downloadingContext) return;
const { request } = this.downloadingContext;
this.sendCancelSegmentRequestCommand(request.segment);
this.downloadingContext = undefined;
this.downloadingErrors.push(error);

const timeoutErrors = this.downloadingErrors.filter(
(error) => error.type === "bytes-receiving-timeout"
);
const { maxPeerNotReceivingBytesTimeoutErrors } = this.settings;
if (timeoutErrors.length >= maxPeerNotReceivingBytesTimeoutErrors) {
this.peerInterface.destroy();
}
},
}
),
};
Expand All @@ -134,13 +153,6 @@ export class Peer {
this.peerInterface.sendCommand(command);
}

private abortSegmentDownloading = () => {
if (!this.downloadingContext) return;
const { request } = this.downloadingContext;
this.sendCancelSegmentRequestCommand(request.segment);
this.downloadingContext = undefined;
};

async uploadSegmentData(segmentExternalId: number, data: ArrayBuffer) {
this.logger(`send segment ${segmentExternalId} to ${this.id}`);
const command: Command.PeerSendSegmentCommand = {
Expand Down Expand Up @@ -170,6 +182,7 @@ export class Peer {
const error = new RequestError(type);
controls.abortOnError(error);
this.downloadingContext = undefined;
this.downloadingErrors.push(error);
}

sendSegmentsAnnouncementCommand(
Expand Down Expand Up @@ -199,10 +212,9 @@ export class Peer {
}

destroy = () => {
this.peerInterface.destroy();
this.cancelSegmentDownloading("peer-closed");
this.logger(`peer closed ${this.id}`);
this.eventHandlers.onPeerClosed(this);
this.logger(`peer closed ${this.id}`);
};

static getPeerIdFromConnection(connection: PeerConnection) {
Expand Down
1 change: 1 addition & 0 deletions packages/p2p-media-loader-core/src/request-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class RequestsContainer {
if (request.type === "p2p") yield request;
}
}

isHybridLoaderRequested(segment: Segment): boolean {
return !!this.requests.get(segment)?.type;
}
Expand Down
Loading

0 comments on commit 4ec4ee2

Please sign in to comment.