Skip to content

Commit

Permalink
fix: give send data a chance to complete before closing stream (#2399)
Browse files Browse the repository at this point in the history
When gracefully closing a stream, allow the stream to send it's final
data payload before aborting it.
  • Loading branch information
achingbrain authored Feb 7, 2024
1 parent 43bd578 commit 0c7bbbb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
27 changes: 15 additions & 12 deletions packages/utils/src/abstract-stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CodeError } from '@libp2p/interface'
import { type Pushable, pushable } from 'it-pushable'
import defer, { type DeferredPromise } from 'p-defer'
import pDefer from 'p-defer'
import { raceSignal } from 'race-signal'
import { Uint8ArrayList } from 'uint8arraylist'
import { closeSource } from './close-source.js'
Expand Down Expand Up @@ -104,6 +105,7 @@ export abstract class AbstractStream implements Stream {
private readonly onReset?: () => void
private readonly onAbort?: (err: Error) => void
private readonly sendCloseWriteTimeout: number
private sendingData?: DeferredPromise<void>

constructor (init: AbstractStreamInit) {
this.sinkController = new AbortController()
Expand Down Expand Up @@ -180,8 +182,11 @@ export abstract class AbstractStream implements Stream {

const res = this.sendData(data, options)

if (isPromise(res)) { // eslint-disable-line max-depth
if (isPromise(res)) {
this.sendingData = pDefer()
await res
this.sendingData.resolve()
this.sendingData = undefined
}
}
} finally {
Expand Down Expand Up @@ -332,17 +337,15 @@ export abstract class AbstractStream implements Stream {
}

if (this.writeStatus === 'writing') {
// stop reading from the source passed to `.sink` in the microtask queue
// - this lets any data queued by the user in the current tick get read
// before we exit
await new Promise((resolve, reject) => {
queueMicrotask(() => {
this.log.trace('aborting source passed to .sink')
this.sinkController.abort()
raceSignal(this.sinkEnd.promise, options.signal)
.then(resolve, reject)
})
})
// try to let sending outgoing data succeed
if (this.sendingData != null) {
await raceSignal(this.sendingData.promise, options.signal)
}

// stop reading from the source passed to `.sink`
this.log.trace('aborting source passed to .sink')
this.sinkController.abort()
await raceSignal(this.sinkEnd.promise, options.signal)
}

this.writeStatus = 'closed'
Expand Down
14 changes: 13 additions & 1 deletion packages/utils/test/abstract-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,22 @@ describe('abstract stream', () => {
it('should wait for sending data to finish when closing gracefully', async () => {
const sendStarted = pDefer()
let timeFinished: number = 0
const wasAbortedBeforeSendingFinished = pDefer()
const wasAbortedAfterSendingFinished = pDefer()

// stub send method to simulate slow sending
stream.sendData = async () => {
stream.sendData = async (data, options) => {
sendStarted.resolve()
await delay(1000)
timeFinished = Date.now()

// slow send has finished, make sure we weren't aborted before we were
// done sending data
wasAbortedBeforeSendingFinished.resolve(options?.signal?.aborted)

// save a reference to the signal, should be aborted after
// `stream.close()` returns
wasAbortedAfterSendingFinished.resolve(options?.signal)
}
const data = [
Uint8Array.from([0, 1, 2, 3, 4])
Expand All @@ -222,6 +232,8 @@ describe('abstract stream', () => {

// should have waited for send to complete
expect(Date.now()).to.be.greaterThanOrEqual(timeFinished)
await expect(wasAbortedBeforeSendingFinished.promise).to.eventually.be.false()
await expect(wasAbortedAfterSendingFinished.promise).to.eventually.have.property('aborted', true)
})

it('should abort close due to timeout with slow sender', async () => {
Expand Down

0 comments on commit 0c7bbbb

Please sign in to comment.