Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core-http] terminate connection when download stream is closed #14015

Merged
merged 11 commits into from
Oct 11, 2021
16 changes: 11 additions & 5 deletions sdk/core/core-http/src/fetchHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ export abstract class FetchHttpClient implements HttpClient {
}
let downloadStreamDone = Promise.resolve();
if (isReadableStream(operationResponse?.readableStreamBody)) {
downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody);
downloadStreamDone = isStreamComplete(
operationResponse!.readableStreamBody,
abortController
);
}

Promise.all([uploadStreamDone, downloadStreamDone])
Expand All @@ -237,11 +240,14 @@ function isReadableStream(body: any): body is Readable {
return body && typeof body.pipe === "function";
}

function isStreamComplete(stream: Readable): Promise<void> {
function isStreamComplete(stream: Readable, aborter?: AbortController): Promise<void> {
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
stream.on("error", resolve);
stream.once("close", () => {
aborter?.abort();
resolve();
});
stream.once("end", resolve);
stream.once("error", resolve);
});
}

Expand Down
48 changes: 47 additions & 1 deletion sdk/core/core-http/test/defaultHttpClientTests.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import { createReadStream, ReadStream } from "fs";
import { DefaultHttpClient } from "../src/defaultHttpClient";
import { WebResource, TransferProgressEvent } from "../src/webResource";
import { getHttpMock, HttpMockFacade } from "./mockHttp";
import { PassThrough } from "stream";
import { PassThrough, Readable } from "stream";
import { ReportTransform, CommonResponse } from "../src/fetchHttpClient";
import { CompositeMapper, Serializer } from "../src/serializer";
import { OperationSpec } from "../src/operationSpec";
import { AbortController } from "@azure/abort-controller";

describe("defaultHttpClient (node)", function() {
let httpMock: HttpMockFacade;
Expand Down Expand Up @@ -427,6 +428,51 @@ describe("defaultHttpClient (node)", function() {
requestInit2.agent.proxyOptions.proxyAuth
);
});

it("should abort connection when download stream is closed", async function() {
const payload = new PassThrough();
const b = new PassThrough();
b.pipe(payload, { end: false });
b.write("hello");
const response = {
status: 200,
headers: [],
body: payload
};

let signal: AbortSignal | undefined;
const client = new DefaultHttpClient();
sinon.stub(client, "fetch").callsFake(async (_input, init) => {
assert.ok(init, "expecting valid request initialization");
signal = init!.signal;
return (response as unknown) as CommonResponse;
});

const ac = new AbortController();
const request = new WebResource(
"http://myhost/bigdownload",
"GET",
undefined,
undefined,
undefined,
true
);
request.abortSignal = ac.signal;
const promise = client.sendRequest(request);

const res = await promise;
assert.ok(res.readableStreamBody, "Expecting valid download stream");

assert.ok(signal, "Expecting valid signal");
const abortFiredPromise = new Promise<void>((resolve) => {
signal!.onabort = () => {
resolve();
};
});
const stream: Readable = res.readableStreamBody as any;
stream.destroy();
await abortFiredPromise; // 'abort' event fired
});
});

describe("ReportTransform", function() {
Expand Down