diff --git a/packages/utils/src/abstract-stream.ts b/packages/utils/src/abstract-stream.ts index 0594bdc9cd..9585342d86 100644 --- a/packages/utils/src/abstract-stream.ts +++ b/packages/utils/src/abstract-stream.ts @@ -1,6 +1,7 @@ import { CodeError } from '@libp2p/interface' import { type Pushable, pushable } from 'it-pushable' import defer, { type DeferredPromise } from 'p-defer' +import pDefer from 'p-defer' import { raceSignal } from 'race-signal' import { Uint8ArrayList } from 'uint8arraylist' import { closeSource } from './close-source.js' @@ -104,6 +105,7 @@ export abstract class AbstractStream implements Stream { private readonly onReset?: () => void private readonly onAbort?: (err: Error) => void private readonly sendCloseWriteTimeout: number + private sendingData?: DeferredPromise constructor (init: AbstractStreamInit) { this.sinkController = new AbortController() @@ -180,8 +182,11 @@ export abstract class AbstractStream implements Stream { const res = this.sendData(data, options) - if (isPromise(res)) { // eslint-disable-line max-depth + if (isPromise(res)) { + this.sendingData = pDefer() await res + this.sendingData.resolve() + this.sendingData = undefined } } } finally { @@ -332,17 +337,15 @@ export abstract class AbstractStream implements Stream { } if (this.writeStatus === 'writing') { - // stop reading from the source passed to `.sink` in the microtask queue - // - this lets any data queued by the user in the current tick get read - // before we exit - await new Promise((resolve, reject) => { - queueMicrotask(() => { - this.log.trace('aborting source passed to .sink') - this.sinkController.abort() - raceSignal(this.sinkEnd.promise, options.signal) - .then(resolve, reject) - }) - }) + // try to let sending outgoing data succeed + if (this.sendingData != null) { + await raceSignal(this.sendingData.promise, options.signal) + } + + // stop reading from the source passed to `.sink` + this.log.trace('aborting source passed to .sink') + this.sinkController.abort() + await raceSignal(this.sinkEnd.promise, options.signal) } this.writeStatus = 'closed' diff --git a/packages/utils/test/abstract-stream.spec.ts b/packages/utils/test/abstract-stream.spec.ts index 4ff53673cc..5cf860dcb2 100644 --- a/packages/utils/test/abstract-stream.spec.ts +++ b/packages/utils/test/abstract-stream.spec.ts @@ -201,12 +201,22 @@ describe('abstract stream', () => { it('should wait for sending data to finish when closing gracefully', async () => { const sendStarted = pDefer() let timeFinished: number = 0 + const wasAbortedBeforeSendingFinished = pDefer() + const wasAbortedAfterSendingFinished = pDefer() // stub send method to simulate slow sending - stream.sendData = async () => { + stream.sendData = async (data, options) => { sendStarted.resolve() await delay(1000) timeFinished = Date.now() + + // slow send has finished, make sure we weren't aborted before we were + // done sending data + wasAbortedBeforeSendingFinished.resolve(options?.signal?.aborted) + + // save a reference to the signal, should be aborted after + // `stream.close()` returns + wasAbortedAfterSendingFinished.resolve(options?.signal) } const data = [ Uint8Array.from([0, 1, 2, 3, 4]) @@ -222,6 +232,8 @@ describe('abstract stream', () => { // should have waited for send to complete expect(Date.now()).to.be.greaterThanOrEqual(timeFinished) + await expect(wasAbortedBeforeSendingFinished.promise).to.eventually.be.false() + await expect(wasAbortedAfterSendingFinished.promise).to.eventually.have.property('aborted', true) }) it('should abort close due to timeout with slow sender', async () => {