From b7ff3682863048e62adf75b77e203242acee89ef Mon Sep 17 00:00:00 2001 From: Tim Perry <1526883+pimterry@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:53:18 +0200 Subject: [PATCH] http2: fix h2-over-h2 connection proxying PR-URL: https://github.com/nodejs/node/pull/52368 Reviewed-By: Matteo Collina Reviewed-By: Paolo Insogna Reviewed-By: Marco Ippolito --- lib/internal/http2/core.js | 19 +++---- lib/internal/js_stream_socket.js | 10 ++-- lib/internal/stream_base_commons.js | 2 + .../test-http2-client-proxy-over-http2.js | 50 +++++++++++++++++++ 4 files changed, 67 insertions(+), 14 deletions(-) create mode 100644 test/parallel/test-http2-client-proxy-over-http2.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 296968688e6802..97d59ae5520ca8 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -177,6 +177,7 @@ const { kUpdateTimer, kHandle, kSession, + kBoundSession, setStreamTimeout, } = require('internal/stream_base_commons'); const { kTimeout } = require('internal/timers'); @@ -1121,7 +1122,7 @@ function cleanupSession(session) { if (handle) handle.ondone = null; if (socket) { - socket[kSession] = undefined; + socket[kBoundSession] = undefined; socket[kServer] = undefined; } } @@ -1235,10 +1236,10 @@ class Http2Session extends EventEmitter { // If the session property already exists on the socket, // then it has already been bound to an Http2Session instance // and cannot be attached again. - if (socket[kSession] !== undefined) + if (socket[kBoundSession] !== undefined) throw new ERR_HTTP2_SOCKET_BOUND(); - socket[kSession] = this; + socket[kBoundSession] = this; if (!socket._handle || !socket._handle.isStreamBase) { socket = new JSStreamSocket(socket); @@ -1617,7 +1618,7 @@ class Http2Session extends EventEmitter { } _onTimeout() { - callTimeout(this); + callTimeout(this, this); } ref() { @@ -2093,7 +2094,7 @@ class Http2Stream extends Duplex { } _onTimeout() { - callTimeout(this, kSession); + callTimeout(this, this[kSession]); } // True if the HEADERS frame has been sent @@ -2419,7 +2420,7 @@ class Http2Stream extends Duplex { } } -function callTimeout(self, kSession) { +function callTimeout(self, session) { // If the session is destroyed, this should never actually be invoked, // but just in case... if (self.destroyed) @@ -2430,7 +2431,7 @@ function callTimeout(self, kSession) { // happens, meaning that if a write is ongoing it should never equal the // newly fetched, updated value. if (self[kState].writeQueueSize > 0) { - const handle = kSession ? self[kSession][kHandle] : self[kHandle]; + const handle = session[kHandle]; const chunksSentSinceLastWrite = handle !== undefined ? handle.chunksSentSinceLastWrite : null; if (chunksSentSinceLastWrite !== null && @@ -3017,7 +3018,7 @@ ObjectDefineProperty(Http2Session.prototype, 'setTimeout', setTimeoutValue); // When the socket emits an error, destroy the associated Http2Session and // forward it the same error. function socketOnError(error) { - const session = this[kSession]; + const session = this[kBoundSession]; if (session !== undefined) { // We can ignore ECONNRESET after GOAWAY was received as there's nothing // we can do and the other side is fully within its rights to do so. @@ -3300,7 +3301,7 @@ function setupCompat(ev) { } function socketOnClose() { - const session = this[kSession]; + const session = this[kBoundSession]; if (session !== undefined) { debugSessionObj(session, 'socket closed'); const err = session.connecting ? new ERR_SOCKET_CLOSED() : null; diff --git a/lib/internal/js_stream_socket.js b/lib/internal/js_stream_socket.js index 3e01327202be1a..0f74626f3f6a32 100644 --- a/lib/internal/js_stream_socket.js +++ b/lib/internal/js_stream_socket.js @@ -17,7 +17,7 @@ let debug = require('internal/util/debuglog').debuglog( ); const { owner_symbol } = require('internal/async_hooks').symbols; const { ERR_STREAM_WRAP } = require('internal/errors').codes; -const { kSession } = require('internal/stream_base_commons'); +const { kBoundSession } = require('internal/stream_base_commons'); const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); @@ -265,12 +265,12 @@ class JSStreamSocket extends Socket { }); } - get [kSession]() { - return this.stream[kSession]; + get [kBoundSession]() { + return this.stream[kBoundSession]; } - set [kSession](session) { - this.stream[kSession] = session; + set [kBoundSession](session) { + this.stream[kBoundSession] = session; } } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index b692e3c30c3869..f6c3c5a8cf1063 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -33,6 +33,7 @@ const kMaybeDestroy = Symbol('kMaybeDestroy'); const kUpdateTimer = Symbol('kUpdateTimer'); const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); const kHandle = Symbol('kHandle'); +const kBoundSession = Symbol('kBoundSession'); const kSession = Symbol('kSession'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { @@ -255,6 +256,7 @@ function setStreamTimeout(msecs, callback) { } else { this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); if (this[kSession]) this[kSession][kUpdateTimer](); + if (this[kBoundSession]) this[kBoundSession][kUpdateTimer](); if (callback !== undefined) { validateFunction(callback, 'callback'); diff --git a/test/parallel/test-http2-client-proxy-over-http2.js b/test/parallel/test-http2-client-proxy-over-http2.js new file mode 100644 index 00000000000000..e749c1e4268e9d --- /dev/null +++ b/test/parallel/test-http2-client-proxy-over-http2.js @@ -0,0 +1,50 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const h2 = require('http2'); + +const server = h2.createServer(); + +server.listen(0, common.mustCall(function() { + const proxyClient = h2.connect(`http://localhost:${server.address().port}`); + + const request = proxyClient.request({ + ':method': 'CONNECT', + ':authority': 'example.com:80' + }); + + request.on('response', common.mustCall((connectResponse) => { + assert.strictEqual(connectResponse[':status'], 200); + + const proxiedClient = h2.connect('http://example.com', { + createConnection: () => request // Tunnel via first request stream + }); + + const proxiedRequest = proxiedClient.request(); + proxiedRequest.on('response', common.mustCall((proxiedResponse) => { + assert.strictEqual(proxiedResponse[':status'], 204); + + proxiedClient.close(); + proxyClient.close(); + server.close(); + })); + })); +})); + +server.once('connect', common.mustCall((req, res) => { + assert.strictEqual(req.headers[':method'], 'CONNECT'); + res.writeHead(200); // Accept the CONNECT tunnel + + // Handle this stream as a new 'proxied' connection (pretend to forward + // but actually just unwrap the tunnel ourselves): + server.emit('connection', res.stream); +})); + +// Handle the 'proxied' request itself: +server.once('request', common.mustCall((req, res) => { + res.writeHead(204); + res.end(); +}));