Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage] avro parser fix #9154

Merged
merged 4 commits into from
Jun 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/storage/storage-internal-avro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"sideEffect": false,
"private": true,
"author": "Microsoft Corporation",
"version": "1.0.0-preview.1",
"version": "1.0.0",
"description": "internal avro parser",
"license": "MIT",
"repository": "github:Azure/azure-sdk-for-js",
Expand Down Expand Up @@ -68,7 +68,7 @@
"puppeteer": "^2.0.0",
"rimraf": "^3.0.0",
"rollup": "^1.16.3",
"@rollup/plugin-commonjs": "^11.0.1",
"@rollup/plugin-commonjs": "11.0.2",
jeremymeng marked this conversation as resolved.
Show resolved Hide resolved
"@rollup/plugin-node-resolve": "^7.0.0",
"rollup-plugin-shim": "^1.0.0",
"rollup-plugin-sourcemaps": "^0.4.2",
Expand Down
42 changes: 22 additions & 20 deletions sdk/storage/storage-internal-avro/src/AvroParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,34 @@ export class AvroParser {
return buf[0];
}

// int and long are stored in variable-length zig-zag coding.
// variable-length: https://lucene.apache.org/core/3_5_0/fileformats.html#VInt
// zig-zag: https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
private static async readZigZagLong(stream: AvroReadable): Promise<number> {
// copied from https://github.com/apache/avro/blob/master/lang/js/lib/utils.js#L321
let n = 0;
let k = 0;
let b, h, f, fk;
let zigZagEncoded = 0;
let significanceInBit = 0;
let byte, haveMoreByte, significanceInFloat;

do {
b = await AvroParser.readByte(stream);
h = b & 0x80;
n |= (b & 0x7f) << k;
k += 7;
} while (h && k < 28);

if (h) {
// Switch to float arithmetic, otherwise we might overflow.
f = n;
fk = 268435456; // 2 ** 28.
byte = await AvroParser.readByte(stream);
haveMoreByte = byte & 0x80;
zigZagEncoded |= (byte & 0x7f) << significanceInBit;
significanceInBit += 7;
} while (haveMoreByte && significanceInBit < 28); // bitwise operation only works for 32-bit integers

if (haveMoreByte) {
// Switch to float arithmetic
// FIXME: this only works when zigZagEncoded is no more than Number.MAX_SAFE_INTEGER (2**53 - 1)
significanceInFloat = 268435456; // 2 ** 28.
do {
b = await AvroParser.readByte(stream);
f += (b & 0x7f) * fk;
fk *= 128;
} while (b & 0x80);
return (f % 2 ? -(f + 1) : f) / 2;
byte = await AvroParser.readByte(stream);
zigZagEncoded += (byte & 0x7f) * significanceInFloat;
significanceInFloat *= 128; // 2 ** 7
} while (byte & 0x80);
return (zigZagEncoded % 2 ? -(zigZagEncoded + 1) : zigZagEncoded) / 2;
}

return (n >> 1) ^ -(n & 1);
return (zigZagEncoded >> 1) ^ -(zigZagEncoded & 1);
}

public static async readLong(stream: AvroReadable): Promise<number> {
Expand Down
30 changes: 21 additions & 9 deletions sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class AvroReadableFromStream extends AvroReadable {
super();
this._readable = readable;
this._position = 0;
// workaround due to Readable.readable only availabe after Node.js v11.4
// workaround due to Readable.readable only available after Node.js v11.4
// this._stillReadable = true;
// this._readable.on("end", () => {
// this._stillReadable = false;
Expand Down Expand Up @@ -46,24 +46,36 @@ export class AvroReadableFromStream extends AvroReadable {
let chunk = this._readable.read(size);
if (chunk) {
this._position += chunk.length;
// chunk.lenght maybe less than desired size if the stream ends.
// chunk.length maybe less than desired size if the stream ends.
return this.toUint8Array(chunk);
} else {
// register callback to wait for enough data to read
return new Promise((resolve, reject) => {
const callback = () => {
const readableCallback = () => {
let chunk = this._readable.read(size);
if (chunk) {
this._position += chunk.length;
// chunk.lenght maybe less than desired size if the stream ends.
// chunk.length maybe less than desired size if the stream ends.
resolve(this.toUint8Array(chunk));
this._readable.removeListener("readable", callback);
this._readable.removeListener("readable", readableCallback);
this._readable.removeListener("error", reject);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://nodejs.org/api/events.html#events_emitter_once_eventname_listener

according to the doc one-time listener will be removed so we shouldn't need to remove by ourselves?

Copy link
Member Author

@ljian3377 ljian3377 Jun 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next time eventName is triggered, this listener is removed and then invoked.

But these listener will only be removed if triggered.
Adding this because getting "max listener limit exceeded" warning.

Also, do you know how to guarantee the input stream (not fully consumed) to be properly garbage-collected after the AvroReadableFromStream is unreachable. I assume it will as long as there is no more data consumer.

this._readable.removeListener("end", reject);
this._readable.removeListener("close", reject);
}
};
this._readable.on("readable", callback);
this._readable.once("error", reject);
this._readable.once("end", reject);
this._readable.once("close", reject);

const rejectCallback = () => {
this._readable.removeListener("readable", readableCallback);
this._readable.removeListener("error", rejectCallback);
this._readable.removeListener("end", rejectCallback);
this._readable.removeListener("close", rejectCallback);
reject();
};

this._readable.on("readable", readableCallback);
this._readable.once("error", rejectCallback);
this._readable.once("end", rejectCallback);
this._readable.once("close", rejectCallback);
});
}
}
Expand Down