From 62e15a793a5695b383c2940c5193472993e5400d Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Thu, 8 Aug 2019 21:12:41 +0200 Subject: [PATCH] http: outgoing cork PR-URL: https://github.com/nodejs/node/pull/29053 Reviewed-By: Anna Henningsen <anna@addaleax.net> --- doc/api/http.md | 16 +++++++++ lib/_http_outgoing.js | 45 ++++++++++++++++++++---- lib/internal/http2/compat.js | 12 +++++++ test/parallel/test-http-response-cork.js | 33 +++++++++++++++++ 4 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 test/parallel/test-http-response-cork.js diff --git a/doc/api/http.md b/doc/api/http.md index 46450d806733f1..cab01f3e30c58d 100644 --- a/doc/api/http.md +++ b/doc/api/http.md @@ -1231,6 +1231,13 @@ deprecated: v13.0.0 See [`response.socket`][]. +### response.cork() +<!-- YAML +added: REPLACEME +--> + +See [`writable.cork()`][]. + ### response.end(\[data\[, encoding\]\]\[, callback\]) <!-- YAML added: v0.1.90 @@ -1516,6 +1523,13 @@ response.statusMessage = 'Not found'; After response header was sent to the client, this property indicates the status message which was sent out. +### response.uncork() +<!-- YAML +added: REPLACEME +--> + +See [`writable.uncork()`][]. + ### response.writableEnded <!-- YAML added: v12.9.0 @@ -2358,3 +2372,5 @@ not abort the request or do anything besides add a `'timeout'` event. [`socket.unref()`]: net.html#net_socket_unref [`url.parse()`]: url.html#url_url_parse_urlstring_parsequerystring_slashesdenotehost [`HPE_HEADER_OVERFLOW`]: errors.html#errors_hpe_header_overflow +[`writable.cork()`]: stream.html#stream_writable_cork +[`writable.uncork()`]: stream.html#stream_writable_uncork diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 5345f2d979a164..0dc13f19fe918c 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -56,6 +56,8 @@ const { validateString } = require('internal/validators'); const HIGH_WATER_MARK = getDefaultHighWaterMark(); const { CRLF, debug } = common; +const kCorked = Symbol('corked'); + const RE_CONN_CLOSE = /(?:^|\W)close(?:$|\W)/i; const RE_TE_CHUNKED = common.chunkExpression; @@ -99,6 +101,7 @@ function OutgoingMessage() { this.finished = false; this._headerSent = false; + this[kCorked] = 0; this.socket = null; this._header = null; @@ -137,6 +140,13 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { } }); +Object.defineProperty(OutgoingMessage.prototype, 'writableCorked', { + get() { + const corked = this.socket ? this.socket.writableCorked : 0; + return corked + this[kCorked]; + } +}); + Object.defineProperty(OutgoingMessage.prototype, '_headers', { get: internalUtil.deprecate(function() { return this.getHeaders(); @@ -213,6 +223,21 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { return headers; }; +OutgoingMessage.prototype.cork = function() { + if (this.socket) { + this.socket.cork(); + } else { + this[kCorked]++; + } +}; + +OutgoingMessage.prototype.uncork = function() { + if (this.socket) { + this.socket.uncork(); + } else if (this[kCorked]) { + this[kCorked]--; + } +}; OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { @@ -710,7 +735,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { return this; } - var uncork; + if (this.socket) { + this.socket.cork(); + } + if (chunk) { if (typeof chunk !== 'string' && !(chunk instanceof Buffer)) { throw new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); @@ -721,10 +749,6 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { else this._contentLength = chunk.length; } - if (this.socket) { - this.socket.cork(); - uncork = true; - } write_(this, chunk, encoding, null, true); } else if (!this._header) { this._contentLength = 0; @@ -743,8 +767,12 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { this._send('', 'latin1', finish); } - if (uncork) + if (this.socket) { + // Fully uncork connection on end(). + this.socket._writableState.corked = 1; this.socket.uncork(); + } + this[kCorked] = 0; this.finished = true; @@ -805,6 +833,11 @@ OutgoingMessage.prototype._flush = function _flush() { }; OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { + while (this[kCorked]) { + this[kCorked]--; + socket.cork(); + } + const outputLength = this.outputData.length; if (outputLength <= 0) return undefined; diff --git a/lib/internal/http2/compat.js b/lib/internal/http2/compat.js index 2d6ed47d74e29f..5bc64504cd3d74 100644 --- a/lib/internal/http2/compat.js +++ b/lib/internal/http2/compat.js @@ -503,6 +503,10 @@ class Http2ServerResponse extends Stream { return this[kState].statusCode; } + get writableCorked() { + return this[kStream].writableCorked; + } + set statusCode(code) { code |= 0; if (code >= 100 && code < 200) @@ -627,6 +631,14 @@ class Http2ServerResponse extends Stream { return this; } + cork() { + this[kStream].cork(); + } + + uncork() { + this[kStream].uncork(); + } + write(chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding; diff --git a/test/parallel/test-http-response-cork.js b/test/parallel/test-http-response-cork.js new file mode 100644 index 00000000000000..4c85412c7bfcec --- /dev/null +++ b/test/parallel/test-http-response-cork.js @@ -0,0 +1,33 @@ +'use strict'; +const common = require('../common'); +const http = require('http'); +const assert = require('assert'); + +const server = http.createServer((req, res) => { + let corked = false; + const originalWrite = res.socket.write; + res.socket.write = common.mustCall((...args) => { + assert.strictEqual(corked, false); + return originalWrite.call(res.socket, ...args); + }, 5); + corked = true; + res.cork(); + assert.strictEqual(res.writableCorked, res.socket.writableCorked); + res.cork(); + assert.strictEqual(res.writableCorked, res.socket.writableCorked); + res.writeHead(200, { 'a-header': 'a-header-value' }); + res.uncork(); + assert.strictEqual(res.writableCorked, res.socket.writableCorked); + corked = false; + res.end('asd'); + assert.strictEqual(res.writableCorked, res.socket.writableCorked); +}); + +server.listen(0, () => { + http.get({ port: server.address().port }, (res) => { + res.on('data', common.mustCall()); + res.on('end', common.mustCall(() => { + server.close(); + })); + }); +});