Skip to content

Commit

Permalink
chore(H2): onboard H2 into Undici queueing system (#3707) (#3724)
Browse files Browse the repository at this point in the history
(cherry picked from commit d6c44f3)

Co-authored-by: Carlos Fuentes <[email protected]>
  • Loading branch information
Uzlopak and metcoder95 authored Oct 12, 2024
1 parent 39c5974 commit a699105
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 181 deletions.
75 changes: 56 additions & 19 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ const {
kOnError,
kMaxConcurrentStreams,
kHTTP2Session,
kResume
kResume,
kSize,
kHTTPContext
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')
Expand Down Expand Up @@ -160,11 +162,10 @@ async function connectH2 (client, socket) {
version: 'h2',
defaultPipelining: Infinity,
write (...args) {
// TODO (fix): return
writeH2(client, ...args)
return writeH2(client, ...args)
},
resume () {

resumeH2(client)
},
destroy (err, callback) {
if (closed) {
Expand All @@ -183,6 +184,20 @@ async function connectH2 (client, socket) {
}
}

function resumeH2 (client) {
const socket = client[kSocket]

if (socket?.destroyed === false) {
if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) {
socket.unref()
client[kHTTP2Session].unref()
} else {
socket.ref()
client[kHTTP2Session].ref()
}
}
}

function onHttp2SessionError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

Expand Down Expand Up @@ -210,17 +225,32 @@ function onHttp2SessionEnd () {
* along with the socket right away
*/
function onHTTP2GoAway (code) {
const err = new RequestAbortedError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
// We cannot recover, so best to close the session and the socket
const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${code}`, util.getSocketInfo(this))
const client = this[kClient]

// We need to trigger the close cycle right away
// We need to destroy the session and the socket
// Requests should be failed with the error after the current one is handled
this[kSocket][kError] = err
this[kClient][kOnError](err)
client[kSocket] = null
client[kHTTPContext] = null

this.unref()
if (this[kHTTP2Session] != null) {
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}

util.destroy(this[kSocket], err)

// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null
util.errorRequest(client, request, err)

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
Expand All @@ -237,10 +267,6 @@ function writeH2 (client, request) {
return false
}

if (request.aborted) {
return false
}

const headers = {}
for (let n = 0; n < reqHeaders.length; n += 2) {
const key = reqHeaders[n + 0]
Expand Down Expand Up @@ -283,6 +309,8 @@ function writeH2 (client, request) {
// We do not destroy the socket as we can continue using the session
// the stream get's destroyed and the session remains to create new streams
util.destroy(body, err)
client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
}

try {
Expand All @@ -293,6 +321,10 @@ function writeH2 (client, request) {
util.errorRequest(client, request, err)
}

if (request.aborted) {
return false
}

if (method === 'CONNECT') {
session.ref()
// We are already connected, streams are pending, first request
Expand All @@ -304,10 +336,12 @@ function writeH2 (client, request) {
if (stream.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
} else {
stream.once('ready', () => {
request.onUpgrade(null, null, stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
})
}

Expand Down Expand Up @@ -428,17 +462,20 @@ function writeH2 (client, request) {
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
request.onComplete([])
return
}

// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
if (session[kOpenStreams] === 0) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side

session.unref()
}

abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
})

stream.once('close', () => {
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
"test:fuzzing": "node test/fuzzing/fuzzing.test.js",
"test:fetch": "npm run build:node && npm run test:fetch:nobuild",
"test:fetch:nobuild": "borp --timeout 180000 --expose-gc --concurrency 1 -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy",
"test:h2": "npm run test:h2:core && npm run test:h2:fetch",
"test:h2:core": "borp -p \"test/http2*.js\"",
"test:h2:fetch": "npm run build:node && borp -p \"test/fetch/http2*.js\"",
"test:interceptors": "borp -p \"test/interceptors/*.js\"",
"test:jest": "cross-env NODE_V8_COVERAGE= jest",
"test:unit": "borp --expose-gc -p \"test/*.js\"",
Expand Down
Loading

0 comments on commit a699105

Please sign in to comment.