Skip to content

Commit

Permalink
[storage] avro parser fix (#9154)
Browse files Browse the repository at this point in the history
* fix

* resolve PR comments: remove useless assignment

* AvroReadableFromStream: removeListener when read succeeds

* remove listener when error callback triggered

Co-authored-by: Lin Jian <[email protected]>
  • Loading branch information
ljian3377 and Lin Jian authored Jun 7, 2020
1 parent d404205 commit 031fdce
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 31 deletions.
4 changes: 2 additions & 2 deletions sdk/storage/storage-internal-avro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"sideEffect": false,
"private": true,
"author": "Microsoft Corporation",
"version": "1.0.0-preview.1",
"version": "1.0.0",
"description": "internal avro parser",
"license": "MIT",
"repository": "github:Azure/azure-sdk-for-js",
Expand Down Expand Up @@ -68,7 +68,7 @@
"puppeteer": "^2.0.0",
"rimraf": "^3.0.0",
"rollup": "^1.16.3",
"@rollup/plugin-commonjs": "^11.0.1",
"@rollup/plugin-commonjs": "11.0.2",
"@rollup/plugin-node-resolve": "^7.0.0",
"rollup-plugin-shim": "^1.0.0",
"rollup-plugin-sourcemaps": "^0.4.2",
Expand Down
42 changes: 22 additions & 20 deletions sdk/storage/storage-internal-avro/src/AvroParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,34 @@ export class AvroParser {
return buf[0];
}

// int and long are stored in variable-length zig-zag coding.
// variable-length: https://lucene.apache.org/core/3_5_0/fileformats.html#VInt
// zig-zag: https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
private static async readZigZagLong(stream: AvroReadable): Promise<number> {
// copied from https://github.com/apache/avro/blob/master/lang/js/lib/utils.js#L321
let n = 0;
let k = 0;
let b, h, f, fk;
let zigZagEncoded = 0;
let significanceInBit = 0;
let byte, haveMoreByte, significanceInFloat;

do {
b = await AvroParser.readByte(stream);
h = b & 0x80;
n |= (b & 0x7f) << k;
k += 7;
} while (h && k < 28);

if (h) {
// Switch to float arithmetic, otherwise we might overflow.
f = n;
fk = 268435456; // 2 ** 28.
byte = await AvroParser.readByte(stream);
haveMoreByte = byte & 0x80;
zigZagEncoded |= (byte & 0x7f) << significanceInBit;
significanceInBit += 7;
} while (haveMoreByte && significanceInBit < 28); // bitwise operation only works for 32-bit integers

if (haveMoreByte) {
// Switch to float arithmetic
// FIXME: this only works when zigZagEncoded is no more than Number.MAX_SAFE_INTEGER (2**53 - 1)
significanceInFloat = 268435456; // 2 ** 28.
do {
b = await AvroParser.readByte(stream);
f += (b & 0x7f) * fk;
fk *= 128;
} while (b & 0x80);
return (f % 2 ? -(f + 1) : f) / 2;
byte = await AvroParser.readByte(stream);
zigZagEncoded += (byte & 0x7f) * significanceInFloat;
significanceInFloat *= 128; // 2 ** 7
} while (byte & 0x80);
return (zigZagEncoded % 2 ? -(zigZagEncoded + 1) : zigZagEncoded) / 2;
}

return (n >> 1) ^ -(n & 1);
return (zigZagEncoded >> 1) ^ -(zigZagEncoded & 1);
}

public static async readLong(stream: AvroReadable): Promise<number> {
Expand Down
30 changes: 21 additions & 9 deletions sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class AvroReadableFromStream extends AvroReadable {
super();
this._readable = readable;
this._position = 0;
// workaround due to Readable.readable only availabe after Node.js v11.4
// workaround due to Readable.readable only available after Node.js v11.4
// this._stillReadable = true;
// this._readable.on("end", () => {
// this._stillReadable = false;
Expand Down Expand Up @@ -46,24 +46,36 @@ export class AvroReadableFromStream extends AvroReadable {
let chunk = this._readable.read(size);
if (chunk) {
this._position += chunk.length;
// chunk.lenght maybe less than desired size if the stream ends.
// chunk.length maybe less than desired size if the stream ends.
return this.toUint8Array(chunk);
} else {
// register callback to wait for enough data to read
return new Promise((resolve, reject) => {
const callback = () => {
const readableCallback = () => {
let chunk = this._readable.read(size);
if (chunk) {
this._position += chunk.length;
// chunk.lenght maybe less than desired size if the stream ends.
// chunk.length maybe less than desired size if the stream ends.
resolve(this.toUint8Array(chunk));
this._readable.removeListener("readable", callback);
this._readable.removeListener("readable", readableCallback);
this._readable.removeListener("error", reject);
this._readable.removeListener("end", reject);
this._readable.removeListener("close", reject);
}
};
this._readable.on("readable", callback);
this._readable.once("error", reject);
this._readable.once("end", reject);
this._readable.once("close", reject);

const rejectCallback = () => {
this._readable.removeListener("readable", readableCallback);
this._readable.removeListener("error", rejectCallback);
this._readable.removeListener("end", rejectCallback);
this._readable.removeListener("close", rejectCallback);
reject();
};

this._readable.on("readable", readableCallback);
this._readable.once("error", rejectCallback);
this._readable.once("end", rejectCallback);
this._readable.once("close", rejectCallback);
});
}
}
Expand Down

0 comments on commit 031fdce

Please sign in to comment.