Skip to content

Commit

Permalink
stream: bypass legacy destroy for pipeline and async iteration
Browse files Browse the repository at this point in the history
PR-URL: #38505
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag authored and dnlup committed Jun 15, 2021
1 parent c0becbc commit f4609bd
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 12 deletions.
2 changes: 2 additions & 0 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
Expand Down Expand Up @@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
res[kDestroy] = null;

// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
Expand Down
10 changes: 8 additions & 2 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {
} = primordials;

const { Readable, finished } = require('stream');
const { kDestroy } = require('internal/streams/destroy');

const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
Expand Down Expand Up @@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
cleanup();
onError(this, e || err, cb);
process.nextTick(onError, this, e || err, cb);
});
} else {
onError(this, err, cb);
process.nextTick(onError, this, err, cb);
}
};

IncomingMessage.prototype[kDestroy] = function(err) {
this.socket = null;
this.destroy(err);
};

IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {
Expand Down
12 changes: 6 additions & 6 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ function onServerResponseClose() {
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
if (this._httpMessage) {
this._httpMessage.destroyed = true;
this._httpMessage._closed = true;
this._httpMessage.emit('close');
emitCloseNT(this._httpMessage);
}
}

Expand Down Expand Up @@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
}

function emitCloseNT(self) {
self.destroyed = true;
self._closed = true;
self.emit('close');
if (!self.destroyed) {
self.destroyed = true;
self._closed = true;
self.emit('close');
}
}

// The following callback is issued after the headers have been read on a
Expand Down
59 changes: 55 additions & 4 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
codes: {
ERR_MULTIPLE_CALLBACK,
},
AbortError,
} = require('internal/errors');
const {
Symbol,
Expand Down Expand Up @@ -363,15 +364,65 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

const kDestroyed = Symbol('kDestroyed');

function emitCloseLegacy(stream) {
stream.emit('close');
}

function emitErrorCloseLegacy(stream, err) {
stream.emit('error', err);
process.nextTick(emitCloseLegacy, stream);
}

function isDestroyed(stream) {
return stream.destroyed || stream[kDestroyed];
}

function isReadable(stream) {
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
}

function isWritable(stream) {
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
if (isDestroyed(stream)) {
return;
}

if (!err && (isReadable(stream) || isWritable(stream))) {
err = new AbortError();
}

// TODO: Remove isRequest branches.
if (typeof stream[kDestroy] === 'function') {
stream[kDestroy](err);
} else if (isRequest(stream)) {
stream.abort();
} else if (isRequest(stream.req)) {
stream.req.abort();
} else if (typeof stream.destroy === 'function') {
stream.destroy(err);
} else if (typeof stream.close === 'function') {
// TODO: Don't lose err?
stream.close();
} else if (err) {
process.nextTick(emitErrorCloseLegacy, stream);
} else {
process.nextTick(emitCloseLegacy, stream);
}

if (!stream.destroyed) {
stream[kDestroyed] = true;
}
}

module.exports = {
kDestroy,
isDestroyed,
construct,
destroyer,
destroy,
Expand Down
2 changes: 2 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

Expand All @@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
Expand Down
115 changes: 115 additions & 0 deletions test/parallel/test-stream-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
'use strict';

const common = require('../common');
const {
Writable,
Readable,
destroy
} = require('stream');
const assert = require('assert');
const http = require('http');

{
const r = new Readable({ read() {} });
destroy(r);
assert.strictEqual(r.destroyed, true);
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
r.on('close', common.mustCall());
}

{
const r = new Readable({ read() {} });
destroy(r, new Error('asd'));
assert.strictEqual(r.destroyed, true);
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
r.on('close', common.mustCall());
}

{
const w = new Writable({ write() {} });
destroy(w);
assert.strictEqual(w.destroyed, true);
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
w.on('close', common.mustCall());
}

{
const w = new Writable({ write() {} });
destroy(w, new Error('asd'));
assert.strictEqual(w.destroyed, true);
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
w.on('close', common.mustCall());
}

{
const server = http.createServer((req, res) => {
destroy(req);
req.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
req.on('close', common.mustCall(() => {
res.end('hello');
}));
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

req.write('asd');
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}

{
const server = http.createServer((req, res) => {
req
.resume()
.on('end', () => {
destroy(req);
})
.on('error', common.mustNotCall());

req.on('close', common.mustCall(() => {
res.end('hello');
}));
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

req.write('asd');
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}

0 comments on commit f4609bd

Please sign in to comment.