From 6bc681fca7b44972665e3f468d7aac1e5b3b4fc1 Mon Sep 17 00:00:00 2001 From: xiaonlimsft Date: Fri, 22 May 2020 17:22:14 +0800 Subject: [PATCH] avor parser offset fix; Readable interface update (#9088) --- .../src/AvroReadableFromStream.ts | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts b/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts index afc3352c859f..62b9b81e3d15 100644 --- a/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts +++ b/sdk/storage/storage-internal-avro/src/AvroReadableFromStream.ts @@ -1,11 +1,18 @@ -import { Readable } from "stream"; -import { AvroReadable } from './AvroReadable'; +import { AvroReadable } from "./AvroReadable"; export class AvroReadableFromStream extends AvroReadable { private _position: number; - private _readable: Readable; + private _readable: NodeJS.ReadableStream; + + private toUint8Array(data: string | Buffer): Uint8Array { + if (typeof data === "string") { + return Buffer.from(data); + } + return data; + } + // private _stillReadable: boolean; - constructor(readable: Readable) { + constructor(readable: NodeJS.ReadableStream) { super(); this._readable = readable; this._position = 0; @@ -22,9 +29,14 @@ export class AvroReadableFromStream extends AvroReadable { return this._position; } public async read(size: number): Promise { - if (size <= 0) { + if (size < 0) { throw new Error(`size parameter should be positive: ${size}`); } + + if (size === 0) { + return new Uint8Array(); + } + // readable is true if it is safe to call readable.read(), which means the stream has not been destroyed or emitted 'error' or 'end'. // if (!this._stillReadable || this._readable.destroyed) { if (!this._readable.readable) { @@ -35,9 +47,8 @@ export class AvroReadableFromStream extends AvroReadable { if (chunk) { this._position += chunk.length; // chunk.lenght maybe less than desired size if the stream ends. - return chunk; - } - else { + return this.toUint8Array(chunk); + } else { // register callback to wait for enough data to read return new Promise((resolve, reject) => { const callback = () => { @@ -45,7 +56,7 @@ export class AvroReadableFromStream extends AvroReadable { if (chunk) { this._position += chunk.length; // chunk.lenght maybe less than desired size if the stream ends. - resolve(chunk); + resolve(this.toUint8Array(chunk)); this._readable.removeListener("readable", callback); } };