Skip to content

Commit

Permalink
[core-http] terminate connection when download stream is closed (#14015)
Browse files Browse the repository at this point in the history
The issue is that when the stream is closed explicitly by `stream.destroy()`
call, we don't ask the underlying transportation layer to cancel the request,
thus leaving network connection to service open which could cause resource
exhaustion.

Addresses #11850
  • Loading branch information
jeremymeng authored Oct 11, 2021
1 parent b92ae58 commit e5a2bf2
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
16 changes: 11 additions & 5 deletions sdk/core/core-http/src/fetchHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ export abstract class FetchHttpClient implements HttpClient {
}
let downloadStreamDone = Promise.resolve();
if (isReadableStream(operationResponse?.readableStreamBody)) {
downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody);
downloadStreamDone = isStreamComplete(
operationResponse!.readableStreamBody,
abortController
);
}

Promise.all([uploadStreamDone, downloadStreamDone])
Expand All @@ -237,11 +240,14 @@ function isReadableStream(body: any): body is Readable {
return body && typeof body.pipe === "function";
}

function isStreamComplete(stream: Readable): Promise<void> {
function isStreamComplete(stream: Readable, aborter?: AbortController): Promise<void> {
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
stream.on("error", resolve);
stream.once("close", () => {
aborter?.abort();
resolve();
});
stream.once("end", resolve);
stream.once("error", resolve);
});
}

Expand Down
48 changes: 47 additions & 1 deletion sdk/core/core-http/test/defaultHttpClientTests.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import { createReadStream, ReadStream } from "fs";
import { DefaultHttpClient } from "../src/defaultHttpClient";
import { WebResource, TransferProgressEvent } from "../src/webResource";
import { getHttpMock, HttpMockFacade } from "./mockHttp";
import { PassThrough } from "stream";
import { PassThrough, Readable } from "stream";
import { ReportTransform, CommonResponse } from "../src/fetchHttpClient";
import { CompositeMapper, Serializer } from "../src/serializer";
import { OperationSpec } from "../src/operationSpec";
import { AbortController } from "@azure/abort-controller";

describe("defaultHttpClient (node)", function() {
let httpMock: HttpMockFacade;
Expand Down Expand Up @@ -427,6 +428,51 @@ describe("defaultHttpClient (node)", function() {
requestInit2.agent.proxyOptions.proxyAuth
);
});

it("should abort connection when download stream is closed", async function() {
const payload = new PassThrough();
const b = new PassThrough();
b.pipe(payload, { end: false });
b.write("hello");
const response = {
status: 200,
headers: [],
body: payload
};

let signal: AbortSignal | undefined;
const client = new DefaultHttpClient();
sinon.stub(client, "fetch").callsFake(async (_input, init) => {
assert.ok(init, "expecting valid request initialization");
signal = init!.signal;
return (response as unknown) as CommonResponse;
});

const ac = new AbortController();
const request = new WebResource(
"http://myhost/bigdownload",
"GET",
undefined,
undefined,
undefined,
true
);
request.abortSignal = ac.signal;
const promise = client.sendRequest(request);

const res = await promise;
assert.ok(res.readableStreamBody, "Expecting valid download stream");

assert.ok(signal, "Expecting valid signal");
const abortFiredPromise = new Promise<void>((resolve) => {
signal!.onabort = () => {
resolve();
};
});
const stream: Readable = res.readableStreamBody as any;
stream.destroy();
await abortFiredPromise; // 'abort' event fired
});
});

describe("ReportTransform", function() {
Expand Down

0 comments on commit e5a2bf2

Please sign in to comment.