From 15ac134bc2eb06c2e777512fc07b5b0b95c0cb2a Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 26 Feb 2021 19:05:18 +0000 Subject: [PATCH 1/9] [core-http] terminate connection when upload/download stream is closed The issue is that when the stream is closed explicitly by `stream.destroy()` call, we don't ask the underlying transportation layer to cancel the request, thus leaving network connection to service open which could cause resource exhaustion. Addresses #11850 --- sdk/core/core-http/src/fetchHttpClient.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdk/core/core-http/src/fetchHttpClient.ts b/sdk/core/core-http/src/fetchHttpClient.ts index 203489f08edc..44790b5b83c6 100644 --- a/sdk/core/core-http/src/fetchHttpClient.ts +++ b/sdk/core/core-http/src/fetchHttpClient.ts @@ -209,11 +209,14 @@ export abstract class FetchHttpClient implements HttpClient { if (httpRequest.abortSignal && abortListener) { let uploadStreamDone = Promise.resolve(); if (isReadableStream(body)) { - uploadStreamDone = isStreamComplete(body); + uploadStreamDone = isStreamComplete(body, abortController); } let downloadStreamDone = Promise.resolve(); if (isReadableStream(operationResponse?.readableStreamBody)) { - downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody); + downloadStreamDone = isStreamComplete( + operationResponse!.readableStreamBody, + abortController + ); } Promise.all([uploadStreamDone, downloadStreamDone]) @@ -237,9 +240,12 @@ function isReadableStream(body: any): body is Readable { return body && typeof body.pipe === "function"; } -function isStreamComplete(stream: Readable): Promise { +function isStreamComplete(stream: Readable, aborter: AbortController): Promise { return new Promise((resolve) => { - stream.on("close", resolve); + stream.on("close", () => { + aborter.abort(); + resolve(); + }); stream.on("end", resolve); stream.on("error", resolve); }); From 3c4b0d87d396be2a8e6b36cd3d4288efcb0caca6 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 9 Sep 2021 16:18:45 +0000 Subject: [PATCH 2/9] Don't abort upload stream on stream close event as the uploading might still be in progress. --- sdk/core/core-http/src/fetchHttpClient.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/core/core-http/src/fetchHttpClient.ts b/sdk/core/core-http/src/fetchHttpClient.ts index 44790b5b83c6..04f26dfa5b04 100644 --- a/sdk/core/core-http/src/fetchHttpClient.ts +++ b/sdk/core/core-http/src/fetchHttpClient.ts @@ -209,7 +209,7 @@ export abstract class FetchHttpClient implements HttpClient { if (httpRequest.abortSignal && abortListener) { let uploadStreamDone = Promise.resolve(); if (isReadableStream(body)) { - uploadStreamDone = isStreamComplete(body, abortController); + uploadStreamDone = isStreamComplete(body); } let downloadStreamDone = Promise.resolve(); if (isReadableStream(operationResponse?.readableStreamBody)) { @@ -240,10 +240,10 @@ function isReadableStream(body: any): body is Readable { return body && typeof body.pipe === "function"; } -function isStreamComplete(stream: Readable, aborter: AbortController): Promise { +function isStreamComplete(stream: Readable, aborter?: AbortController): Promise { return new Promise((resolve) => { stream.on("close", () => { - aborter.abort(); + aborter?.abort(); resolve(); }); stream.on("end", resolve); From 507d33fadef58b90e3923b887a8bc14e45887943 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Mon, 13 Sep 2021 23:12:06 +0000 Subject: [PATCH 3/9] use `once` instead of `on` --- sdk/core/core-http/src/fetchHttpClient.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/core/core-http/src/fetchHttpClient.ts b/sdk/core/core-http/src/fetchHttpClient.ts index 04f26dfa5b04..59cce702e44b 100644 --- a/sdk/core/core-http/src/fetchHttpClient.ts +++ b/sdk/core/core-http/src/fetchHttpClient.ts @@ -242,12 +242,12 @@ function isReadableStream(body: any): body is Readable { function isStreamComplete(stream: Readable, aborter?: AbortController): Promise { return new Promise((resolve) => { - stream.on("close", () => { + stream.once("close", () => { aborter?.abort(); resolve(); }); - stream.on("end", resolve); - stream.on("error", resolve); + stream.once("end", resolve); + stream.once("error", resolve); }); } From c5e463c208c2f5a8641ed1d2dd80d22757dcadfa Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Tue, 28 Sep 2021 20:57:07 +0000 Subject: [PATCH 4/9] Add a test --- .../test/defaultHttpClientTests.node.ts | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/sdk/core/core-http/test/defaultHttpClientTests.node.ts b/sdk/core/core-http/test/defaultHttpClientTests.node.ts index f312e53e808d..2362d8d2cbb8 100644 --- a/sdk/core/core-http/test/defaultHttpClientTests.node.ts +++ b/sdk/core/core-http/test/defaultHttpClientTests.node.ts @@ -12,10 +12,11 @@ import { createReadStream, ReadStream } from "fs"; import { DefaultHttpClient } from "../src/defaultHttpClient"; import { WebResource, TransferProgressEvent } from "../src/webResource"; import { getHttpMock, HttpMockFacade } from "./mockHttp"; -import { PassThrough } from "stream"; +import { PassThrough, Readable, pipeline } from "stream"; import { ReportTransform, CommonResponse } from "../src/fetchHttpClient"; import { CompositeMapper, Serializer } from "../src/serializer"; import { OperationSpec } from "../src/operationSpec"; +import { AbortController } from "@azure/abort-controller"; describe("defaultHttpClient (node)", function() { let httpMock: HttpMockFacade; @@ -427,6 +428,52 @@ describe("defaultHttpClient (node)", function() { requestInit2.agent.proxyOptions.proxyAuth ); }); + + it("should abort connection when download stream is closed", async function() { + const payload = new PassThrough(); + const b = new PassThrough(); + b.pipe(payload, { end: false }); + b.write("hello"); + const localPort = 32293; + const errorHandler = function(err: any) { + if (err) { + assert.strictEqual(err.message, "Premature close"); + } + }; + + const localServer = http + .createServer(function(_req, res) { + pipeline(payload, res, errorHandler); + res.writeHead(200, { "Content-Type": "text/html" }); + }) + .listen(localPort); + + httpMock.passThrough(); + const ac = new AbortController(); + const request = new WebResource( + `http://127.0.0.1:${localPort}`, + "GET", + undefined, + undefined, + undefined, + true /* streaming response */ + ); + request.abortSignal = ac.signal; + const httpClient = new DefaultHttpClient(); + + const response = await httpClient.sendRequest(request); + + try { + const stream = response.readableStreamBody as Readable; + stream.destroy(); + } catch (e) { + console.log(e as Error); + } finally { + // Clean up + localServer.close(); + httpMock.teardown(); + } + }); }); describe("ReportTransform", function() { From 3ac9c9c49055301f68bd7edfd89b98f0314f4ad8 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Tue, 28 Sep 2021 20:59:48 +0000 Subject: [PATCH 5/9] Remove catch clause that does nothing --- sdk/core/core-http/test/defaultHttpClientTests.node.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/core/core-http/test/defaultHttpClientTests.node.ts b/sdk/core/core-http/test/defaultHttpClientTests.node.ts index 2362d8d2cbb8..60e5081f5052 100644 --- a/sdk/core/core-http/test/defaultHttpClientTests.node.ts +++ b/sdk/core/core-http/test/defaultHttpClientTests.node.ts @@ -466,8 +466,6 @@ describe("defaultHttpClient (node)", function() { try { const stream = response.readableStreamBody as Readable; stream.destroy(); - } catch (e) { - console.log(e as Error); } finally { // Clean up localServer.close(); From 9a7979976045f6dff02ef1a44ff5d85b5c94c61f Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 8 Oct 2021 19:23:15 +0000 Subject: [PATCH 6/9] Test using mocked HttpClient.fetch() --- .../test/defaultHttpClientTests.node.ts | 50 +++++++------------ 1 file changed, 17 insertions(+), 33 deletions(-) diff --git a/sdk/core/core-http/test/defaultHttpClientTests.node.ts b/sdk/core/core-http/test/defaultHttpClientTests.node.ts index 60e5081f5052..0ea3a0a27b5f 100644 --- a/sdk/core/core-http/test/defaultHttpClientTests.node.ts +++ b/sdk/core/core-http/test/defaultHttpClientTests.node.ts @@ -12,7 +12,7 @@ import { createReadStream, ReadStream } from "fs"; import { DefaultHttpClient } from "../src/defaultHttpClient"; import { WebResource, TransferProgressEvent } from "../src/webResource"; import { getHttpMock, HttpMockFacade } from "./mockHttp"; -import { PassThrough, Readable, pipeline } from "stream"; +import { PassThrough, Readable } from "stream"; import { ReportTransform, CommonResponse } from "../src/fetchHttpClient"; import { CompositeMapper, Serializer } from "../src/serializer"; import { OperationSpec } from "../src/operationSpec"; @@ -434,43 +434,27 @@ describe("defaultHttpClient (node)", function() { const b = new PassThrough(); b.pipe(payload, { end: false }); b.write("hello"); - const localPort = 32293; - const errorHandler = function(err: any) { - if (err) { - assert.strictEqual(err.message, "Premature close"); - } - }; + const response = { + status: 200, + headers: [], + body: payload + } - const localServer = http - .createServer(function(_req, res) { - pipeline(payload, res, errorHandler); - res.writeHead(200, { "Content-Type": "text/html" }); - }) - .listen(localPort); + const client = new DefaultHttpClient(); + sinon.stub(client, "fetch").callsFake(async (_input, _init) => { + return (response as unknown) as CommonResponse; + }); - httpMock.passThrough(); const ac = new AbortController(); - const request = new WebResource( - `http://127.0.0.1:${localPort}`, - "GET", - undefined, - undefined, - undefined, - true /* streaming response */ - ); + const request = new WebResource("http://myhost/bigdownload", "GET", undefined, undefined, undefined, true); request.abortSignal = ac.signal; - const httpClient = new DefaultHttpClient(); - - const response = await httpClient.sendRequest(request); + const promise = client.sendRequest(request); - try { - const stream = response.readableStreamBody as Readable; - stream.destroy(); - } finally { - // Clean up - localServer.close(); - httpMock.teardown(); - } + const res = await promise; + assert.ok(res.readableStreamBody, "Expecting valid download stream"); + console.log("destroying download stream..."); + const stream: Readable = res.readableStreamBody as any; + stream.destroy(); }); }); From d9935b47f3a5358d56fa94b09137933f59d206bb Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 8 Oct 2021 19:48:43 +0000 Subject: [PATCH 7/9] formatting --- .../core-http/test/defaultHttpClientTests.node.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk/core/core-http/test/defaultHttpClientTests.node.ts b/sdk/core/core-http/test/defaultHttpClientTests.node.ts index 0ea3a0a27b5f..d1da5ce7e538 100644 --- a/sdk/core/core-http/test/defaultHttpClientTests.node.ts +++ b/sdk/core/core-http/test/defaultHttpClientTests.node.ts @@ -438,7 +438,7 @@ describe("defaultHttpClient (node)", function() { status: 200, headers: [], body: payload - } + }; const client = new DefaultHttpClient(); sinon.stub(client, "fetch").callsFake(async (_input, _init) => { @@ -446,7 +446,14 @@ describe("defaultHttpClient (node)", function() { }); const ac = new AbortController(); - const request = new WebResource("http://myhost/bigdownload", "GET", undefined, undefined, undefined, true); + const request = new WebResource( + "http://myhost/bigdownload", + "GET", + undefined, + undefined, + undefined, + true + ); request.abortSignal = ac.signal; const promise = client.sendRequest(request); From 63a87d5d291c20f8291430ef28d6c1d85615cf10 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 8 Oct 2021 20:11:42 +0000 Subject: [PATCH 8/9] Remove console.log call --- sdk/core/core-http/test/defaultHttpClientTests.node.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/core/core-http/test/defaultHttpClientTests.node.ts b/sdk/core/core-http/test/defaultHttpClientTests.node.ts index d1da5ce7e538..b2cff2fe466b 100644 --- a/sdk/core/core-http/test/defaultHttpClientTests.node.ts +++ b/sdk/core/core-http/test/defaultHttpClientTests.node.ts @@ -459,7 +459,6 @@ describe("defaultHttpClient (node)", function() { const res = await promise; assert.ok(res.readableStreamBody, "Expecting valid download stream"); - console.log("destroying download stream..."); const stream: Readable = res.readableStreamBody as any; stream.destroy(); }); From 97e4642bece0f10d9e84a003841e85a4c3df76e5 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 8 Oct 2021 23:19:09 +0000 Subject: [PATCH 9/9] Verify that abort event is fired --- .../core-http/test/defaultHttpClientTests.node.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdk/core/core-http/test/defaultHttpClientTests.node.ts b/sdk/core/core-http/test/defaultHttpClientTests.node.ts index b2cff2fe466b..ae635d825982 100644 --- a/sdk/core/core-http/test/defaultHttpClientTests.node.ts +++ b/sdk/core/core-http/test/defaultHttpClientTests.node.ts @@ -440,8 +440,11 @@ describe("defaultHttpClient (node)", function() { body: payload }; + let signal: AbortSignal | undefined; const client = new DefaultHttpClient(); - sinon.stub(client, "fetch").callsFake(async (_input, _init) => { + sinon.stub(client, "fetch").callsFake(async (_input, init) => { + assert.ok(init, "expecting valid request initialization"); + signal = init!.signal; return (response as unknown) as CommonResponse; }); @@ -459,8 +462,16 @@ describe("defaultHttpClient (node)", function() { const res = await promise; assert.ok(res.readableStreamBody, "Expecting valid download stream"); + + assert.ok(signal, "Expecting valid signal"); + const abortFiredPromise = new Promise((resolve) => { + signal!.onabort = () => { + resolve(); + }; + }); const stream: Readable = res.readableStreamBody as any; stream.destroy(); + await abortFiredPromise; // 'abort' event fired }); });