Skip to content

Commit

Permalink
feat: add support for Timeout and Trailers (#3854)
Browse files Browse the repository at this point in the history
* feat: add support for Timeout and Trailers

* chore: meta

* test: ensure localhost

* test: remove unneded statement

* test: windows?

* test: isolate suite

* ci: isolate windows

* test: add logs

* Revert "ci: isolate windows"

This reverts commit da7e78b.

* ci: isolate windows

* fix: close order

* test: explicit closeup

* test: fixup

* fix: fixup

* fix: fixup

* fix: fixup

* fix: fixup

* fix: revert

* test: remove debug

* test: last try

* test: remove bits

* Revert "ci: isolate windows"

This reverts commit 64a02fa.
  • Loading branch information
metcoder95 authored Nov 27, 2024
1 parent ed1da80 commit 513e213
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 14 deletions.
45 changes: 31 additions & 14 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ const {
kResume,
kSize,
kHTTPContext,
kClosed
kClosed,
kBodyTimeout
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')
Expand Down Expand Up @@ -90,7 +91,11 @@ async function connectH2 (client, socket) {

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
peerMaxConcurrentStreams: client[kMaxConcurrentStreams],
settings: {
// TODO(metcoder95): add support for PUSH
enablePush: false
}
})

session[kOpenStreams] = 0
Expand Down Expand Up @@ -129,7 +134,6 @@ async function connectH2 (client, socket) {
if (socket[kClosed]) {
queueMicrotask(callback)
} else {
// Destroying the socket will trigger the session close
socket.destroy(err).on('close', callback)
}
},
Expand Down Expand Up @@ -281,6 +285,7 @@ function shouldSendContentLength (method) {
}

function writeH2 (client, request) {
const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout]
const session = client[kHTTP2Session]
const { method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
let { body } = request
Expand Down Expand Up @@ -379,6 +384,7 @@ function writeH2 (client, request) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) session.unref()
})
stream.setTimeout(requestTimeout)

return true
}
Expand Down Expand Up @@ -452,6 +458,7 @@ function writeH2 (client, request) {

session.ref()

// TODO(metcoder95): add support for sending trailers
const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
if (expectContinue) {
headers[HTTP2_HEADER_EXPECT] = '100-continue'
Expand All @@ -469,6 +476,7 @@ function writeH2 (client, request) {

// Increment counter as we have new streams open
++session[kOpenStreams]
stream.setTimeout(requestTimeout)

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
Expand Down Expand Up @@ -502,8 +510,9 @@ function writeH2 (client, request) {
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
// Do not complete the request if it was aborted
if (!request.aborted) {
request.onComplete([])
// Not prone to happen for as safety net to avoid race conditions with 'trailers'
if (!request.aborted && !request.completed) {
request.onComplete({})
}

client[kQueue][client[kRunningIdx]++] = null
Expand Down Expand Up @@ -546,17 +555,25 @@ function writeH2 (client, request) {
stream.removeAllListeners('data')
})

// stream.on('timeout', () => {
// // TODO(HTTP/2): Support timeout
// })
stream.on('timeout', () => {
const err = new InformationalError(`HTTP/2: "stream timeout after ${requestTimeout}"`)
stream.removeAllListeners('data')
session[kOpenStreams] -= 1

if (session[kOpenStreams] === 0) {
session.unref()
}

abort(err)
})

// stream.on('push', headers => {
// // TODO(HTTP/2): Support push
// })
stream.once('trailers', trailers => {
if (request.aborted || request.completed) {
return
}

// stream.on('trailers', headers => {
// // TODO(HTTP/2): Support trailers
// })
request.onComplete(trailers)
})

return true

Expand Down
110 changes: 110 additions & 0 deletions test/http2.js
Original file line number Diff line number Diff line change
Expand Up @@ -1693,3 +1693,113 @@ test('#3803 - sending FormData bodies works', async (t) => {

await assert.completed
})

test('Should handle http2 stream timeout', async t => {
const server = createSecureServer(pem)
const stream = createReadStream(__filename)

server.on('stream', async (stream, headers) => {
stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
':status': 200
})

setTimeout(() => {
stream.end('hello h2!')
}, 500)
})

t = tspl(t, { plan: 1 })

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true,
bodyTimeout: 50
})

after(() => server.close())
after(() => client.close())

const res = await client.request({
path: '/',
method: 'PUT',
headers: {
'x-my-header': 'foo'
},
body: stream
})

t.rejects(res.body.text(), {
message: 'HTTP/2: "stream timeout after 50"'
})
})

test('Should handle http2 trailers', async t => {
const server = createSecureServer(pem)

server.on('stream', async (stream, headers) => {
stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
':status': 200
},
{
waitForTrailers: true
})

stream.on('wantTrailers', () => {
stream.sendTrailers({
'x-trailer': 'hello'
})
})

stream.end('hello h2!')
})

t = tspl(t, { plan: 1 })

server.listen(0, '127.0.0.1')
await once(server, 'listening')

const client = new Client(`https://${server.address().address}:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true
})

after(async () => {
server.close()
})
after(() => client.close())

client.dispatch({
path: '/',
method: 'PUT',
body: 'hello'
}, {
onConnect () {

},
onHeaders () {
return true
},
onData () {
return true
},
onComplete (trailers) {
t.strictEqual(trailers['x-trailer'], 'hello')
},
onError (err) {
t.ifError(err)
}
})

await t.completed
})

0 comments on commit 513e213

Please sign in to comment.