Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: http2 queueing #3761

Merged
merged 14 commits into from
Oct 31, 2024
2 changes: 1 addition & 1 deletion lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class RequestHandler extends AsyncResource {
this.res = null
// Ensure all queued handlers are invoked before destroying res.
queueMicrotask(() => {
util.destroy(res, err)
util.destroy(res.on('error', noop), err)
})
}

Expand Down
45 changes: 27 additions & 18 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function resumeH2 (client) {
const socket = client[kSocket]

if (socket?.destroyed === false) {
if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) {
if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) {
socket.unref()
client[kHTTP2Session].unref()
} else {
Expand Down Expand Up @@ -192,6 +192,7 @@ function onHttp2SessionGoAway (errorCode) {
client[kHTTPContext] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].close()
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}
Expand All @@ -218,7 +219,8 @@ function onHttp2SessionClose () {

const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))

client[kHTTP2Session] = null
client[kSocket] = null
client[kHTTPContext] = null

if (client.destroyed) {
assert(client[kPending] === 0)
Expand All @@ -238,6 +240,7 @@ function onHttp2SocketClose () {
const client = this[kHTTP2Session][kClient]

client[kSocket] = null
client[kHTTPContext] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].destroy(err)
Expand Down Expand Up @@ -301,7 +304,7 @@ function writeH2 (client, request) {
}

/** @type {import('node:http2').ClientHttp2Stream} */
let stream
let stream = null

const { hostname, port } = client[kUrl]

Expand All @@ -318,14 +321,19 @@ function writeH2 (client, request) {
util.errorRequest(client, request, err)

if (stream != null) {
util.destroy(stream, err)
// On Abort, we close the stream to send RST_STREAM frame
stream.close()
// We delay the destroy to allow the stream to send the RST_STREAM frame
queueMicrotask(() => util.destroy(stream, err))
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
// We move the running index to the next request
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
}

// We do not destroy the socket as we can continue using the session
// the stream gets destroyed and the session remains to create new streams
util.destroy(body, err)
client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
}

try {
Expand Down Expand Up @@ -438,6 +446,7 @@ function writeH2 (client, request) {
endStream: shouldEndStream,
signal
})

writeBodyH2()
}

Expand All @@ -454,9 +463,6 @@ function writeH2 (client, request) {
// for those scenarios, best effort is to destroy the stream immediately
// as there's no value to keep it open.
if (request.aborted) {
const err = new RequestAbortedError()
util.errorRequest(client, request, err)
util.destroy(stream, err)
return
}

Expand All @@ -471,26 +477,29 @@ function writeH2 (client, request) {
})
})

stream.once('end', () => {
stream.once('end', (err) => {
// When state is null, it means we haven't consumed body and the stream still do not have
// a state.
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
request.onComplete([])
}

if (session[kOpenStreams] === 0) {
client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
} else {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
--session[kOpenStreams]
if (session[kOpenStreams] === 0) {
session.unref()
}

session.unref()
abort(err ?? new InformationalError('HTTP/2: stream half-closed (remote)'))
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
}

abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
})

stream.once('close', () => {
Expand Down
201 changes: 196 additions & 5 deletions test/http2.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'use strict'

Check failure on line 1 in test/http2.js

View workflow job for this annotation

GitHub Actions / test (22, ubuntu-latest) / Test with Node.js 22 on ubuntu-latest

test/http2.js

[Error [ERR_TEST_FAILURE]: test timed out after 30000ms] { code: 'ERR_TEST_FAILURE', failureType: 'testTimeoutFailure', cause: 'test timed out after 30000ms' }

const { tspl } = require('@matteo.collina/tspl')
const { test, after } = require('node:test')
Expand Down Expand Up @@ -1218,7 +1218,7 @@
allowH2: true
})

t = tspl(t, { plan: 2 })
t = tspl(t, { plan: 4 })
after(async () => {
server.close()
await client.close()
Expand All @@ -1233,14 +1233,21 @@
t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)')
t.strictEqual(err.code, 'UND_ERR_INFO')
})
await client
.request({
path: '/',
method: 'GET'
})
.catch(err => {
t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)')
t.strictEqual(err.code, 'UND_ERR_INFO')
})
})

test('#2364 - Concurrent aborts', async t => {
const server = createSecureServer(pem)

server.on('stream', (stream, headers, _flags, rawHeaders) => {
t.strictEqual(headers['x-my-header'], 'foo')
t.strictEqual(headers[':method'], 'GET')
setTimeout(() => {
stream.respond({
'content-type': 'text/plain; charset=utf-8',
Expand All @@ -1261,10 +1268,128 @@
allowH2: true
})

t = tspl(t, { plan: 14 })
t = tspl(t, { plan: 10 })
after(() => server.close())
after(() => client.close())
const signal = AbortSignal.timeout(100)

client.request(
{
path: '/1',
method: 'GET',
headers: {
'x-my-header': 'foo'
}
},
(err, response) => {
t.ifError(err)
t.strictEqual(
response.headers['content-type'],
'text/plain; charset=utf-8'
)
t.strictEqual(response.headers['x-custom-h2'], 'hello')
t.strictEqual(response.statusCode, 200)
}
)

client.request(
{
path: '/2',
method: 'GET',
headers: {
'x-my-header': 'foo'
},
signal
},
(err, response) => {
t.strictEqual(err.name, 'TimeoutError')
}
)

client.request(
{
path: '/3',
method: 'GET',
headers: {
'x-my-header': 'foo'
}
},
(err, response) => {
t.ifError(err)
t.strictEqual(
response.headers['content-type'],
'text/plain; charset=utf-8'
)
t.strictEqual(response.headers['x-custom-h2'], 'hello')
t.strictEqual(response.statusCode, 200)
}
)

client.request(
{
path: '/4',
method: 'GET',
headers: {
'x-my-header': 'foo'
},
signal
},
(err, response) => {
t.strictEqual(err.name, 'TimeoutError')
}
)

await t.completed
})

test('#2364 - Concurrent aborts (2nd variant)', async t => {
const server = createSecureServer(pem)
let counter = 0

server.on('stream', (stream, headers, _flags, rawHeaders) => {
counter++

if (counter % 2 === 0) {
setTimeout(() => {
if (stream.destroyed) {
return
}

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
':status': 200
})

stream.end('hello h2!')
}, 400)

return
}

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
':status': 200
})

stream.end('hello h2!')
})

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true
})

t = tspl(t, { plan: 10 })
after(() => server.close())
after(() => client.close())
const signal = AbortSignal.timeout(50)
const signal = AbortSignal.timeout(300)

client.request(
{
Expand Down Expand Up @@ -1442,3 +1567,69 @@

await t.completed
})

test('#3753 - Handle GOAWAY Gracefully', async (t) => {

Check failure on line 1571 in test/http2.js

View workflow job for this annotation

GitHub Actions / test (22, ubuntu-latest) / Test with Node.js 22 on ubuntu-latest

#3753 - Handle GOAWAY Gracefully

Error [ERR_TEST_FAILURE]: The expression evaluated to a falsy value: assert(!this.completed) at process.emit (node:events:518:28) { code: 'ERR_TEST_FAILURE', failureType: 'uncaughtException', cause: AssertionError [ERR_ASSERTION]: The expression evaluated to a falsy value: assert(!this.completed) at Request.onData (/home/runner/work/undici/undici/lib/core/request.js:254:5) at ClientHttp2Stream.<anonymous> (/home/runner/work/undici/undici/lib/dispatcher/client-h2.js:474:19) at ClientHttp2Stream.emit (node:events:518:28) at addChunk (node:internal/streams/readable:561:12) at readableAddChunkPushByteMode (node:internal/streams/readable:512:3) at Readable.push (node:internal/streams/readable:392:5) at Http2Stream.onStreamRead (node:internal/stream_base_commons:189:23) { generatedMessage: true, code: 'ERR_ASSERTION', actual: false, expected: true, operator: '==' } }
const server = createSecureServer(pem)
let counter = 0
let session = null

server.on('session', s => {
session = s
})

server.on('stream', (stream) => {
counter++

// Due to the nature of the test, we need to ignore the error
// that is thrown when the session is destroyed and stream
// is in-flight
stream.on('error', () => {})
if (counter === 9 && session != null) {
session.goaway()
stream.end()
} else {
stream.respond({
'content-type': 'text/plain',
':status': 200
})
setTimeout(() => {
stream.end('hello world')
}, 150)
}
})

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
pipelining: 2,
allowH2: true
})

t = tspl(t, { plan: 30 })
after(() => client.close())
after(() => server.close())

for (let i = 0; i < 15; i++) {
client.request({
path: '/',
method: 'GET',
headers: {
'x-my-header': 'foo'
}
}, (err, response) => {
if (i === 9 || i === 8) {
t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0')
t.strictEqual(err?.code, 'UND_ERR_SOCKET')
} else {
t.ifError(err)
t.strictEqual(response.statusCode, 200)
}
})
}

await t.completed
})
Loading