Skip to content

Commit

Permalink
avor parser offset fix; Readable interface update (#9088)
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoningLiu authored May 22, 2020
1 parent 8fe8c37 commit 6bc681f
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import { Readable } from "stream";
import { AvroReadable } from './AvroReadable';
import { AvroReadable } from "./AvroReadable";

export class AvroReadableFromStream extends AvroReadable {
private _position: number;
private _readable: Readable;
private _readable: NodeJS.ReadableStream;

private toUint8Array(data: string | Buffer): Uint8Array {
if (typeof data === "string") {
return Buffer.from(data);
}
return data;
}

// private _stillReadable: boolean;
constructor(readable: Readable) {
constructor(readable: NodeJS.ReadableStream) {
super();
this._readable = readable;
this._position = 0;
Expand All @@ -22,9 +29,14 @@ export class AvroReadableFromStream extends AvroReadable {
return this._position;
}
public async read(size: number): Promise<Uint8Array> {
if (size <= 0) {
if (size < 0) {
throw new Error(`size parameter should be positive: ${size}`);
}

if (size === 0) {
return new Uint8Array();
}

// readable is true if it is safe to call readable.read(), which means the stream has not been destroyed or emitted 'error' or 'end'.
// if (!this._stillReadable || this._readable.destroyed) {
if (!this._readable.readable) {
Expand All @@ -35,17 +47,16 @@ export class AvroReadableFromStream extends AvroReadable {
if (chunk) {
this._position += chunk.length;
// chunk.lenght maybe less than desired size if the stream ends.
return chunk;
}
else {
return this.toUint8Array(chunk);
} else {
// register callback to wait for enough data to read
return new Promise((resolve, reject) => {
const callback = () => {
let chunk = this._readable.read(size);
if (chunk) {
this._position += chunk.length;
// chunk.lenght maybe less than desired size if the stream ends.
resolve(chunk);
resolve(this.toUint8Array(chunk));
this._readable.removeListener("readable", callback);
}
};
Expand Down

0 comments on commit 6bc681f

Please sign in to comment.