Skip to content

Commit

Permalink
[storage] wrapper stream destroy source when needed (Azure#12899)
Browse files Browse the repository at this point in the history
* RetriableReadableStream release source

* _destroy callback with error

* fix tests as in Node 8 close isn't emitted after calling readable.destroy()

* retry on "error" and do not listen on "close"

* remove event listener when needed

* remove abortSignal support in RetriableReadableStream as it's handled by core-http

* nit: increase timeout for syncUploadFromURL large content case

* nit: use parallel upload and increase timeout for syncUploadFromURL large content case

* changelog
  • Loading branch information
ljian3377 authored Jan 28, 2021
1 parent 06be1a1 commit 3468baa
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 102 deletions.
1 change: 1 addition & 0 deletions sdk/storage/storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 12.4.1 (Unreleased)

- Fixed a compile failure due to "Can't resolve 'crypto'" in Angular. [Issue #13267](https://github.com/Azure/azure-sdk-for-js/issues/13267).
- Fixed an issue that the download stream returned by `BlobClient.download` won't release underlying resources unless it's fully consumed. [Isssue #11850](https://github.com/Azure/azure-sdk-for-js/issues/11850).
- Fixed an error when listing blob with a metadata key of `_` [issue #9197](https://github.com/Azure/azure-sdk-for-js/issues/9171)
- The `"Unclosed root tag"` XML parser error is now retriable. [PR #13076](https://github.com/Azure/azure-sdk-for-js/pull/13076).

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion sdk/storage/storage-blob/src/Clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,6 @@ export class BlobClient extends StorageClient {
offset,
res.contentLength!,
{
abortSignal: options.abortSignal,
maxRetryRequests: options.maxRetryRequests,
onProgress: options.onProgress
}
Expand Down
184 changes: 89 additions & 95 deletions sdk/storage/storage-blob/src/utils/RetriableReadableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,9 @@
import { TransferProgressEvent } from "@azure/core-http";
import { Readable } from "stream";

import { AbortSignal, AbortSignalLike, AbortError } from "@azure/abort-controller";

export type ReadableStreamGetter = (offset: number) => Promise<NodeJS.ReadableStream>;

export interface RetriableReadableStreamOptions {
/**
* An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation.
* For example, use the &commat;azure/abort-controller to create an `AbortSignal`.
*
* @type {AbortSignalLike}
* @memberof RetriableReadableStreamOptions
*/
abortSignal?: AbortSignalLike;

/**
* Max retry count (>=0), undefined or invalid value means no retry
*
Expand Down Expand Up @@ -46,9 +35,15 @@ export interface RetriableReadableStreamOptions {
* @memberof RetriableReadableStreamOptions
*/
doInjectErrorOnce?: boolean;
}

const ABORT_ERROR = new AbortError("The operation was aborted.");
/**
* A threshold, not a limit. Dictates the amount of data that a stream buffers before it stops asking for more data.
*
* @type {number}
* @memberof RetriableReadableStreamOptions
*/
highWaterMark?: number;
}

/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
Expand All @@ -59,7 +54,6 @@ const ABORT_ERROR = new AbortError("The operation was aborted.");
* @extends {Readable}
*/
export class RetriableReadableStream extends Readable {
private aborter: AbortSignalLike;
private start: number;
private offset: number;
private end: number;
Expand All @@ -69,10 +63,6 @@ export class RetriableReadableStream extends Readable {
private maxRetryRequests: number;
private onProgress?: (progress: TransferProgressEvent) => void;
private options: RetriableReadableStreamOptions;
private abortHandler = () => {
this.source.pause();
this.emit("error", ABORT_ERROR);
};

/**
* Creates an instance of RetriableReadableStream.
Expand All @@ -92,8 +82,7 @@ export class RetriableReadableStream extends Readable {
count: number,
options: RetriableReadableStreamOptions = {}
) {
super();
this.aborter = options.abortSignal || AbortSignal.none;
super({ highWaterMark: options.highWaterMark });
this.getter = getter;
this.source = source;
this.start = offset;
Expand All @@ -104,96 +93,101 @@ export class RetriableReadableStream extends Readable {
this.onProgress = options.onProgress;
this.options = options;

this.aborter.addEventListener("abort", this.abortHandler);

this.setSourceDataHandler();
this.setSourceEndHandler();
this.setSourceErrorHandler();
this.setSourceEventHandlers();
}

public _read() {
if (!this.aborter.aborted) {
this.source.resume();
}
this.source.resume();
}

private setSourceDataHandler() {
this.source.on("data", (data: Buffer) => {
if (this.options.doInjectErrorOnce) {
this.options.doInjectErrorOnce = undefined;
this.source.pause();
this.source.removeAllListeners("data");
this.source.emit("end");
return;
}
private setSourceEventHandlers() {
this.source.on("data", this.sourceDataHandler);
this.source.on("end", this.sourceErrorOrEndHandler);
this.source.on("error", this.sourceErrorOrEndHandler);
}

// console.log(
// `Offset: ${this.offset}, Received ${data.length} from internal stream`
// );
this.offset += data.length;
if (this.onProgress) {
this.onProgress({ loadedBytes: this.offset - this.start });
}
if (!this.push(data)) {
this.source.pause();
}
});
private removeSourceEventHandlers() {
this.source.removeListener("data", this.sourceDataHandler);
this.source.removeListener("end", this.sourceErrorOrEndHandler);
this.source.removeListener("error", this.sourceErrorOrEndHandler);
}

private setSourceEndHandler() {
this.source.on("end", () => {
private sourceDataHandler = (data: Buffer) => {
if (this.options.doInjectErrorOnce) {
this.options.doInjectErrorOnce = undefined;
this.source.pause();
this.source.removeAllListeners("data");
this.source.emit("end");
return;
}

// console.log(
// `Offset: ${this.offset}, Received ${data.length} from internal stream`
// );
this.offset += data.length;
if (this.onProgress) {
this.onProgress({ loadedBytes: this.offset - this.start });
}
if (!this.push(data)) {
this.source.pause();
}
};

private sourceErrorOrEndHandler = (err?: Error) => {
if (err && err.name === "AbortError") {
this.destroy(err);
return;
}

// console.log(
// `Source stream emits end or error, offset: ${
// this.offset
// }, dest end : ${this.end}`
// );
this.removeSourceEventHandlers();
if (this.offset - 1 === this.end) {
this.push(null);
} else if (this.offset <= this.end) {
// console.log(
// `Source stream emits end, offset: ${
// this.offset
// }, dest end : ${this.end}`
// `retries: ${this.retries}, max retries: ${this.maxRetries}`
// );
if (this.offset - 1 === this.end) {
this.aborter.removeEventListener("abort", this.abortHandler);
this.push(null);
} else if (this.offset <= this.end) {
// console.log(
// `retries: ${this.retries}, max retries: ${this.maxRetries}`
// );
if (this.retries < this.maxRetryRequests) {
this.retries += 1;
this.getter(this.offset)
.then((newSource) => {
this.source = newSource;
this.setSourceDataHandler();
this.setSourceEndHandler();
this.setSourceErrorHandler();
})
.catch((error) => {
this.emit("error", error);
});
} else {
this.emit(
"error",
new Error(
// tslint:disable-next-line:max-line-length
`Data corruption failure: received less data than required and reached maxRetires limitation. Received data offset: ${this
.offset - 1}, data needed offset: ${this.end}, retries: ${
this.retries
}, max retries: ${this.maxRetryRequests}`
)
);
}
if (this.retries < this.maxRetryRequests) {
this.retries += 1;
this.getter(this.offset)
.then((newSource) => {
this.source = newSource;
this.setSourceEventHandlers();
})
.catch((error) => {
this.destroy(error);
});
} else {
this.emit(
"error",
this.destroy(
new Error(
`Data corruption failure: Received more data than original request, data needed offset is ${
this.end
}, received offset: ${this.offset - 1}`
// tslint:disable-next-line:max-line-length
`Data corruption failure: received less data than required and reached maxRetires limitation. Received data offset: ${this
.offset - 1}, data needed offset: ${this.end}, retries: ${
this.retries
}, max retries: ${this.maxRetryRequests}`
)
);
}
});
}
} else {
this.destroy(
new Error(
`Data corruption failure: Received more data than original request, data needed offset is ${
this.end
}, received offset: ${this.offset - 1}`
)
);
}
};

_destroy(error: Error | null, callback: (error?: Error) => void): void {
// remove listener from source and release source
this.removeSourceEventHandlers();
(this.source as Readable).destroy();

private setSourceErrorHandler() {
this.source.on("error", (error) => {
this.emit("error", error);
});
callback(error === null ? undefined : error);
}
}
11 changes: 7 additions & 4 deletions sdk/storage/storage-blob/test/node/blockblobclient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,23 +376,26 @@ describe("syncUploadFromURL", () => {
undefined,
"recording file too large, exceeds GitHub's file size limit of 100.00 MB"
);
await sourceBlob.upload(largeContent, largeContent.byteLength);
await sourceBlob.uploadData(largeContent);
await blockBlobClient.syncUploadFromURL(sourceBlobURLWithSAS);
});
}).timeout(10 * 60 * 1000);

it("large content with timeout", async () => {
recorder.skip(
undefined,
"recording file too large, exceeds GitHub's file size limit of 100.00 MB"
);
await sourceBlob.upload(largeContent, largeContent.byteLength);
await sourceBlob.uploadData(largeContent);

let exceptionCaught = false;
try {
await blockBlobClient.syncUploadFromURL(sourceBlobURLWithSAS, {
timeoutInSeconds: 1
});
} catch (err) {
assert.deepStrictEqual(err.code, "OperationTimedOut");
exceptionCaught = true;
}
});
assert.ok(exceptionCaught);
}).timeout(10 * 60 * 1000);
});
22 changes: 22 additions & 0 deletions sdk/storage/storage-blob/test/node/highlevel.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { readStreamToLocalFileWithLogs } from "../utils/testutils.node";
import { BLOCK_BLOB_MAX_STAGE_BLOCK_BYTES } from "../../src/utils/constants";
import { Test_CPK_INFO } from "../utils/constants";
import { streamToBuffer2 } from "../../src/utils/utils.node";
import { delay } from "../../src/utils/utils.common";

// tslint:disable:no-empty
describe("Highlevel", () => {
Expand Down Expand Up @@ -706,6 +707,27 @@ describe("Highlevel", () => {
fs.unlinkSync(downloadedFile);
});

it("download abort should work when still fetching body", async () => {
recorder.skip("node", "Temp file - recorder doesn't support saving the file");
await blockBlobClient.uploadFile(tempFileSmall, {
blockSize: 4 * 1024 * 1024,
concurrency: 20
});

const aborter = new AbortController();
const res = await blobClient.download(0, undefined, { abortSignal: aborter.signal });

let exceptionCaught = false;
res.readableStreamBody!.on("error", (err) => {
assert.equal(err.name, "AbortError");
exceptionCaught = true;
});

aborter.abort();
await delay(10);
assert.ok(exceptionCaught);
});

it("downloadToFile should success", async () => {
recorder.skip("node", "Temp file - recorder doesn't support saving the file");
const downloadedFilePath = recorder.getUniqueName("downloadedtofile.");
Expand Down
Loading

0 comments on commit 3468baa

Please sign in to comment.