Skip to content

Commit

Permalink
Fix: segment abort issue & added requestProcessQueueCallback in peer (#…
Browse files Browse the repository at this point in the history
…365)

* improvements

* Fix segment abort issue

* Refactor requestProcessQueueCallback

* Refactor segment event details types

* Small improvements

* Fixed onSegmentAnnouncement

* Update hybrid-loader.ts
  • Loading branch information
DimaDemchenko authored May 29, 2024
1 parent f8e4c67 commit 59a6840
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 47 deletions.
1 change: 1 addition & 0 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class HybridLoader {
this.segmentStorage,
this.config,
this.eventTarget,
this.requestProcessQueueMicrotask,
);

this.logger = debug(`p2pml-core:hybrid-loader-${activeStream.type}`);
Expand Down
8 changes: 5 additions & 3 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export class P2PLoader {
private readonly requests: RequestsContainer,
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly config: CoreConfig,
eventTarget: EventTarget<CoreEventMap>,
private readonly eventTarget: EventTarget<CoreEventMap>,
private readonly onSegmentAnnouncement: () => void,
) {
const streamExternalId = StreamUtils.getStreamExternalId(
this.config.swarmId ?? this.streamManifestUrl,
Expand All @@ -36,9 +37,10 @@ export class P2PLoader {
onPeerConnected: this.onPeerConnected,
// eslint-disable-next-line @typescript-eslint/no-misused-promises
onSegmentRequested: this.onSegmentRequested,
onSegmentsAnnouncement: this.onSegmentAnnouncement,
},
this.config,
eventTarget,
this.eventTarget,
);

this.segmentStorage.subscribeOnUpdate(
Expand Down Expand Up @@ -135,7 +137,7 @@ export class P2PLoader {
peer.sendSegmentAbsentCommand(segmentExternalId);
return;
}
void peer.uploadSegmentData(
await peer.uploadSegmentData(
segment,
byteFrom !== undefined ? segmentData.slice(byteFrom) : segmentData,
);
Expand Down
6 changes: 6 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/loaders-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class P2PLoadersContainer {
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly config: CoreConfig,
private readonly eventTarget: EventTarget<CoreEventMap>,
private onSegmentAnnouncement: () => void,
) {
this.changeCurrentLoader(stream);
}
Expand All @@ -40,6 +41,11 @@ export class P2PLoadersContainer {
this.segmentStorage,
this.config,
this.eventTarget,
() => {
if (this._currentLoaderItem.loader === loader) {
this.onSegmentAnnouncement();
}
},
);
const loggerInfo = LoggerUtils.getStreamString(stream);
this.logger(`created new loader: ${loggerInfo}`);
Expand Down
21 changes: 14 additions & 7 deletions packages/p2p-media-loader-core/src/p2p/peer-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,29 @@ export class PeerProtocol {
const { promise, resolve, reject } = Utils.getControlledPromise<void>();

let isUploadingSegmentData = false;
this.uploadingContext = {

const uploadingContext = {
stopUploading: () => {
isUploadingSegmentData = false;
},
};

this.uploadingContext = uploadingContext;

const sendChunk = () => {
if (!isUploadingSegmentData) {
reject();
return;
}

while (channel.bufferedAmount <= channel.bufferedAmountLowThreshold) {
const chunk = chunks.next().value;

if (!chunk) {
resolve();
break;
}
if (chunk && !isUploadingSegmentData) {
reject();
break;
}

this.connection.send(chunk);
this.onChunkUploaded(chunk.byteLength, this.connection.idUtf8);
}
Expand All @@ -94,10 +100,11 @@ export class PeerProtocol {
isUploadingSegmentData = true;
sendChunk();
await promise;
return promise;
} finally {
channel.removeEventListener("bufferedamountlow", sendChunk);
this.uploadingContext = undefined;
if (this.uploadingContext === uploadingContext) {
this.uploadingContext = undefined;
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type PeerEventHandlers = {
segmentId: number,
byteFrom?: number,
) => void;
onSegmentsAnnouncement: () => void;
};

export class Peer {
Expand Down Expand Up @@ -82,6 +83,7 @@ export class Peer {
case PeerCommandType.SegmentsAnnouncement:
this.loadedSegments = new Set(command.l);
this.httpLoadingSegments = new Set(command.p);
this.eventHandlers.onSegmentsAnnouncement();
break;

case PeerCommandType.SegmentRequest:
Expand All @@ -92,6 +94,7 @@ export class Peer {
case PeerCommandType.SegmentData:
{
if (!this.downloadingContext) break;
if (this.downloadingContext.isSegmentDataCommandReceived) break;

const { request, controls } = this.downloadingContext;
if (request.segment.externalId !== command.i) break;
Expand Down
2 changes: 2 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/tracker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type PeerItem = {
type P2PTrackerClientEventHandlers = {
onPeerConnected: (peer: Peer) => void;
onSegmentRequested: (peer: Peer, segmentExternalId: number) => void;
onSegmentsAnnouncement: () => void;
};

export class P2PTrackerClient {
Expand Down Expand Up @@ -95,6 +96,7 @@ export class P2PTrackerClient {
{
onPeerClosed: this.onPeerClosed,
onSegmentRequested: this.eventHandlers.onSegmentRequested,
onSegmentsAnnouncement: this.eventHandlers.onSegmentsAnnouncement,
},
this.config,
this.eventTarget,
Expand Down
Loading

0 comments on commit 59a6840

Please sign in to comment.