From 031fdceba16d194cb0ccf243a9a74d811a29ca11 Mon Sep 17 00:00:00 2001 From: Lin Jian <1215122919@qq.com> Date: Sun, 7 Jun 2020 14:27:04 +0800 Subject: [PATCH] [storage] avro parser fix (#9154) * fix * resolve PR comments: remove useless assignment * AvroReadableFromStream: removeListener when read succeeds * remove listener when error callback triggered Co-authored-by: Lin Jian --- .../storage-internal-avro/package.json | 4 +- .../storage-internal-avro/src/AvroParser.ts | 42 ++++++++++--------- .../src/AvroReadableFromStream.ts | 30 +++++++++---- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/sdk/storage/storage-internal-avro/package.json b/sdk/storage/storage-internal-avro/package.json index 9e628455afe0..5ac366124c24 100644 --- a/sdk/storage/storage-internal-avro/package.json +++ b/sdk/storage/storage-internal-avro/package.json @@ -3,7 +3,7 @@ "sideEffect": false, "private": true, "author": "Microsoft Corporation", - "version": "1.0.0-preview.1", + "version": "1.0.0", "description": "internal avro parser", "license": "MIT", "repository": "github:Azure/azure-sdk-for-js", @@ -68,7 +68,7 @@ "puppeteer": "^2.0.0", "rimraf": "^3.0.0", "rollup": "^1.16.3", - "@rollup/plugin-commonjs": "^11.0.1", + "@rollup/plugin-commonjs": "11.0.2", "@rollup/plugin-node-resolve": "^7.0.0", "rollup-plugin-shim": "^1.0.0", "rollup-plugin-sourcemaps": "^0.4.2", diff --git a/sdk/storage/storage-internal-avro/src/AvroParser.ts b/sdk/storage/storage-internal-avro/src/AvroParser.ts index b63a347b4375..4bb310068953 100644 --- a/sdk/storage/storage-internal-avro/src/AvroParser.ts +++ b/sdk/storage/storage-internal-avro/src/AvroParser.ts @@ -28,32 +28,34 @@ export class AvroParser { return buf[0]; } + // int and long are stored in variable-length zig-zag coding. + // variable-length: https://lucene.apache.org/core/3_5_0/fileformats.html#VInt + // zig-zag: https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types private static async readZigZagLong(stream: AvroReadable): Promise { - // copied from https://github.com/apache/avro/blob/master/lang/js/lib/utils.js#L321 - let n = 0; - let k = 0; - let b, h, f, fk; + let zigZagEncoded = 0; + let significanceInBit = 0; + let byte, haveMoreByte, significanceInFloat; do { - b = await AvroParser.readByte(stream); - h = b & 0x80; - n |= (b & 0x7f) << k; - k += 7; - } while (h && k < 28); - - if (h) { - // Switch to float arithmetic, otherwise we might overflow. - f = n; - fk = 268435456; // 2 ** 28. + byte = await AvroParser.readByte(stream); + haveMoreByte = byte & 0x80; + zigZagEncoded |= (byte & 0x7f) << significanceInBit; + significanceInBit += 7; + } while (haveMoreByte && significanceInBit < 28); // bitwise operation only works for 32-bit integers + + if (haveMoreByte) { + // Switch to float arithmetic + // FIXME: this only works when zigZagEncoded is no more than Number.MAX_SAFE_INTEGER (2**53 - 1) + significanceInFloat = 268435456; // 2 ** 28. do { - b = await AvroParser.readByte(stream); - f += (b & 0x7f) * fk; - fk *= 128; - } while (b & 0x80); - return (f % 2 ? -(f + 1) : f) / 2; + byte = await AvroParser.readByte(stream); + zigZagEncoded += (byte & 0x7f) * significanceInFloat; + significanceInFloat *= 128; // 2 ** 7 + } while (byte & 0x80); + return (zigZagEncoded % 2 ? -(zigZagEncoded + 1) : zigZagEncoded) / 2; } - return (n >> 1) ^ -(n & 1); + return (zigZagEncoded >> 1) ^ -(zigZagEncoded & 1); } public static async readLong(stream: AvroReadable): Promise { diff --git a/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts b/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts index 62b9b81e3d15..ab8dfea45a49 100644 --- a/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts +++ b/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts @@ -16,7 +16,7 @@ export class AvroReadableFromStream extends AvroReadable { super(); this._readable = readable; this._position = 0; - // workaround due to Readable.readable only availabe after Node.js v11.4 + // workaround due to Readable.readable only available after Node.js v11.4 // this._stillReadable = true; // this._readable.on("end", () => { // this._stillReadable = false; @@ -46,24 +46,36 @@ export class AvroReadableFromStream extends AvroReadable { let chunk = this._readable.read(size); if (chunk) { this._position += chunk.length; - // chunk.lenght maybe less than desired size if the stream ends. + // chunk.length maybe less than desired size if the stream ends. return this.toUint8Array(chunk); } else { // register callback to wait for enough data to read return new Promise((resolve, reject) => { - const callback = () => { + const readableCallback = () => { let chunk = this._readable.read(size); if (chunk) { this._position += chunk.length; - // chunk.lenght maybe less than desired size if the stream ends. + // chunk.length maybe less than desired size if the stream ends. resolve(this.toUint8Array(chunk)); - this._readable.removeListener("readable", callback); + this._readable.removeListener("readable", readableCallback); + this._readable.removeListener("error", reject); + this._readable.removeListener("end", reject); + this._readable.removeListener("close", reject); } }; - this._readable.on("readable", callback); - this._readable.once("error", reject); - this._readable.once("end", reject); - this._readable.once("close", reject); + + const rejectCallback = () => { + this._readable.removeListener("readable", readableCallback); + this._readable.removeListener("error", rejectCallback); + this._readable.removeListener("end", rejectCallback); + this._readable.removeListener("close", rejectCallback); + reject(); + }; + + this._readable.on("readable", readableCallback); + this._readable.once("error", rejectCallback); + this._readable.once("end", rejectCallback); + this._readable.once("close", rejectCallback); }); } }