diff --git a/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts index 3527ead8..9b245ee7 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer-protocol.ts @@ -1,11 +1,14 @@ import { PeerConnection } from "bittorrent-tracker"; -import * as Command from "./commands"; -import * as Utils from "../utils/utils"; import { Settings } from "../types"; +import * as Utils from "../utils/utils"; +import * as Command from "./commands"; export type PeerSettings = Pick< Settings, - "p2pNotReceivingBytesTimeoutMs" | "webRtcMaxMessageSize" | "p2pErrorRetries" + | "p2pNotReceivingBytesTimeoutMs" + | "webRtcMaxMessageSize" + | "p2pErrorRetries" + | "validateP2PSegment" >; export class PeerProtocol { diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index 698afc96..ccc268c4 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -1,16 +1,16 @@ import { PeerConnection } from "bittorrent-tracker"; -import { PeerProtocol, PeerSettings } from "./peer-protocol"; +import debug from "debug"; import { + PeerRequestErrorType, Request, RequestControls, RequestError, - PeerRequestErrorType, RequestInnerErrorType, } from "../requests/request"; -import * as Command from "./commands"; import { Segment } from "../types"; import * as Utils from "../utils/utils"; -import debug from "debug"; +import * as Command from "./commands"; +import { PeerProtocol, PeerSettings } from "./peer-protocol"; const { PeerCommandType } = Command; type PeerEventHandlers = { @@ -45,6 +45,7 @@ export class Peer { this.id = Peer.getPeerIdFromConnection(connection); this.peerProtocol = new PeerProtocol(connection, settings, { onSegmentChunkReceived: this.onSegmentChunkReceived, + // eslint-disable-next-line @typescript-eslint/no-misused-promises onCommandReceived: this.onCommandReceived, }); @@ -62,7 +63,7 @@ export class Peer { if (this.httpLoadingSegments.has(externalId)) return "http-loading"; } - private onCommandReceived = (command: Command.PeerCommand) => { + private onCommandReceived = async (command: Command.PeerCommand) => { switch (command.c) { case PeerCommandType.SegmentsAnnouncement: this.loadedSegments = new Set(command.l); @@ -85,19 +86,24 @@ export class Peer { request.setTotalBytes(command.s); } else if (request.totalBytes - request.loadedBytes !== command.s) { request.clearLoadedBytes(); - this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + this.sendCancelSegmentRequestCommand(request.segment); + this.cancelSegmentDownloading( + "peer-response-bytes-length-mismatch", + ); this.destroy(); } } break; case PeerCommandType.SegmentDataSendingCompleted: { - if (!this.downloadingContext?.isSegmentDataCommandReceived) return; + const downloadingContext = this.downloadingContext; + + if (!downloadingContext?.isSegmentDataCommandReceived) return; - const { request, controls } = this.downloadingContext; + const { request, controls } = downloadingContext; const isWrongSegment = - this.downloadingContext.request.segment.externalId !== command.i; + downloadingContext.request.segment.externalId !== command.i; if (isWrongSegment) { request.clearLoadedBytes(); @@ -110,7 +116,22 @@ export class Peer { if (isWrongBytes) { request.clearLoadedBytes(); - this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + this.cancelSegmentDownloading("peer-response-bytes-length-mismatch"); + this.destroy(); + return; + } + + const isValid = + (await this.settings.validateP2PSegment?.( + request.segment.url, + request.segment.byteRange, + )) ?? true; + + if (this.downloadingContext !== downloadingContext) return; + + if (!isValid) { + request.clearLoadedBytes(); + this.cancelSegmentDownloading("p2p-segment-validation-failed"); this.destroy(); return; } @@ -145,7 +166,7 @@ export class Peer { if (isOverflow) { request.clearLoadedBytes(); - this.cancelSegmentDownloading("peer-response-bytes-mismatch"); + this.cancelSegmentDownloading("peer-response-bytes-length-mismatch"); this.destroy(); return; } @@ -168,9 +189,10 @@ export class Peer { abort: (error) => { if (!this.downloadingContext) return; const { request } = this.downloadingContext; + this.sendCancelSegmentRequestCommand(request.segment); - this.downloadingContext = undefined; this.downloadingErrors.push(error); + this.downloadingContext = undefined; const timeoutErrors = this.downloadingErrors.filter( (error) => error.type === "bytes-receiving-timeout", diff --git a/packages/p2p-media-loader-core/src/requests/request.ts b/packages/p2p-media-loader-core/src/requests/request.ts index 1931b297..9b88784b 100644 --- a/packages/p2p-media-loader-core/src/requests/request.ts +++ b/packages/p2p-media-loader-core/src/requests/request.ts @@ -1,8 +1,8 @@ -import { Segment, Playback, BandwidthCalculators } from "../types"; +import debug from "debug"; +import { BandwidthCalculators, Playback, Segment } from "../types"; +import * as LoggerUtils from "../utils/logger"; import * as StreamUtils from "../utils/stream"; import * as Utils from "../utils/utils"; -import * as LoggerUtils from "../utils/logger"; -import debug from "debug"; export type LoadProgress = { startTimestamp: number; @@ -323,10 +323,11 @@ export type HttpRequestErrorType = | "http-unexpected-status-code"; export type PeerRequestErrorType = - | "peer-response-bytes-mismatch" + | "peer-response-bytes-length-mismatch" | "peer-protocol-violation" | "peer-segment-absent" - | "peer-closed"; + | "peer-closed" + | "p2p-segment-validation-failed"; type RequestErrorType = | RequestInnerErrorType diff --git a/packages/p2p-media-loader-core/src/types.d.ts b/packages/p2p-media-loader-core/src/types.d.ts index 67b3ec8e..e7661f86 100644 --- a/packages/p2p-media-loader-core/src/types.d.ts +++ b/packages/p2p-media-loader-core/src/types.d.ts @@ -1,5 +1,5 @@ -import { RequestAttempt } from "./requests/request"; import { BandwidthCalculator } from "./bandwidth-calculator"; +import { RequestAttempt } from "./requests/request"; export type StreamType = "main" | "secondary"; @@ -53,6 +53,7 @@ export type Settings = { httpNotReceivingBytesTimeoutMs: number; httpErrorRetries: number; p2pErrorRetries: number; + validateP2PSegment?: (url: string, byteRange?: ByteRange) => Promise; }; export type CoreEventHandlers = {