diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index fa19f0e7c9f..857187a402c 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -27,7 +27,8 @@ const { kResume, kSize, kHTTPContext, - kClosed + kClosed, + kBodyTimeout } = require('../core/symbols.js') const kOpenStreams = Symbol('open streams') @@ -90,7 +91,11 @@ async function connectH2 (client, socket) { const session = http2.connect(client[kUrl], { createConnection: () => socket, - peerMaxConcurrentStreams: client[kMaxConcurrentStreams] + peerMaxConcurrentStreams: client[kMaxConcurrentStreams], + settings: { + // TODO(metcoder95): add support for PUSH + enablePush: false + } }) session[kOpenStreams] = 0 @@ -129,7 +134,6 @@ async function connectH2 (client, socket) { if (socket[kClosed]) { queueMicrotask(callback) } else { - // Destroying the socket will trigger the session close socket.destroy(err).on('close', callback) } }, @@ -281,6 +285,7 @@ function shouldSendContentLength (method) { } function writeH2 (client, request) { + const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout] const session = client[kHTTP2Session] const { method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request let { body } = request @@ -379,6 +384,7 @@ function writeH2 (client, request) { session[kOpenStreams] -= 1 if (session[kOpenStreams] === 0) session.unref() }) + stream.setTimeout(requestTimeout) return true } @@ -452,6 +458,7 @@ function writeH2 (client, request) { session.ref() + // TODO(metcoder95): add support for sending trailers const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null if (expectContinue) { headers[HTTP2_HEADER_EXPECT] = '100-continue' @@ -469,6 +476,7 @@ function writeH2 (client, request) { // Increment counter as we have new streams open ++session[kOpenStreams] + stream.setTimeout(requestTimeout) stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers @@ -502,8 +510,9 @@ function writeH2 (client, request) { // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { // Do not complete the request if it was aborted - if (!request.aborted) { - request.onComplete([]) + // Not prone to happen for as safety net to avoid race conditions with 'trailers' + if (!request.aborted && !request.completed) { + request.onComplete({}) } client[kQueue][client[kRunningIdx]++] = null @@ -546,17 +555,25 @@ function writeH2 (client, request) { stream.removeAllListeners('data') }) - // stream.on('timeout', () => { - // // TODO(HTTP/2): Support timeout - // }) + stream.on('timeout', () => { + const err = new InformationalError(`HTTP/2: "stream timeout after ${requestTimeout}"`) + stream.removeAllListeners('data') + session[kOpenStreams] -= 1 + + if (session[kOpenStreams] === 0) { + session.unref() + } + + abort(err) + }) - // stream.on('push', headers => { - // // TODO(HTTP/2): Support push - // }) + stream.once('trailers', trailers => { + if (request.aborted || request.completed) { + return + } - // stream.on('trailers', headers => { - // // TODO(HTTP/2): Support trailers - // }) + request.onComplete(trailers) + }) return true diff --git a/test/http2.js b/test/http2.js index 51666f8f932..0b98ecd9870 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1693,3 +1693,113 @@ test('#3803 - sending FormData bodies works', async (t) => { await assert.completed }) + +test('Should handle http2 stream timeout', async t => { + const server = createSecureServer(pem) + const stream = createReadStream(__filename) + + server.on('stream', async (stream, headers) => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + setTimeout(() => { + stream.end('hello h2!') + }, 500) + }) + + t = tspl(t, { plan: 1 }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true, + bodyTimeout: 50 + }) + + after(() => server.close()) + after(() => client.close()) + + const res = await client.request({ + path: '/', + method: 'PUT', + headers: { + 'x-my-header': 'foo' + }, + body: stream + }) + + t.rejects(res.body.text(), { + message: 'HTTP/2: "stream timeout after 50"' + }) +}) + +test('Should handle http2 trailers', async t => { + const server = createSecureServer(pem) + + server.on('stream', async (stream, headers) => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }, + { + waitForTrailers: true + }) + + stream.on('wantTrailers', () => { + stream.sendTrailers({ + 'x-trailer': 'hello' + }) + }) + + stream.end('hello h2!') + }) + + t = tspl(t, { plan: 1 }) + + server.listen(0, '127.0.0.1') + await once(server, 'listening') + + const client = new Client(`https://${server.address().address}:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + after(async () => { + server.close() + }) + after(() => client.close()) + + client.dispatch({ + path: '/', + method: 'PUT', + body: 'hello' + }, { + onConnect () { + + }, + onHeaders () { + return true + }, + onData () { + return true + }, + onComplete (trailers) { + t.strictEqual(trailers['x-trailer'], 'hello') + }, + onError (err) { + t.ifError(err) + } + }) + + await t.completed +})