diff --git a/sdk/storage/storage-internal-avro/package.json b/sdk/storage/storage-internal-avro/package.json index d9a2ab5b43e3..9562511887ec 100644 --- a/sdk/storage/storage-internal-avro/package.json +++ b/sdk/storage/storage-internal-avro/package.json @@ -1,16 +1,18 @@ { "name": "storage-internal-avro", - "version": "1.0.0", + "author": "Microsoft Corporation", + "version": "1.0.0-preview.1", "description": "internal avro parser", - "main": "index.js", - "directories": { - "test": "tests" + "license": "MIT", + "main": "./srt/index.ts", + "module": "dist-esm/index.js", + "types": "./types/index.d.ts", + "engines": { + "node": ">=8.0.0" }, "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "build:es6": "tsc -p tsconfig.json" }, - "author": "Microsoft Corporation", - "license": "MIT", "dependencies": { "tslib": "^1.10.0" }, diff --git a/sdk/storage/storage-internal-avro/src/AvroConstants.ts b/sdk/storage/storage-internal-avro/src/AvroConstants.ts new file mode 100644 index 000000000000..43a56d3c9b8c --- /dev/null +++ b/sdk/storage/storage-internal-avro/src/AvroConstants.ts @@ -0,0 +1,50 @@ +export const AvroConstants = { + SYNC_MARKER_SIZE: 16, + + // 'O', 'b', 'j', 1 + INIT_BYTES: new Uint8Array([79, 98, 106, 1]), + + CODEC_KEY: "avro.codec", + + SCHEMA_KEY: "avro.schema", + + NULL: "null", + + BOOLEAN: "boolean", + + INT: "int", + + LONG: "long", + + FLOAT: "float", + + DOUBLE: "double", + + BYTES: "bytes", + + STRING: "string", + + RECORD: "record", + + ENUM: "enum", + + MAP: "map", + + ARRAY: "array", + + UNION: "union", + + FIXED: "fixed", + + ALIASES: "aliases", + + NAME: "name", + + FIELDS: "fields", + + TYPE: "type", + + SYMBOLS: "symbols", + + VALUES: "values", +}; diff --git a/sdk/storage/storage-internal-avro/src/AvroParser.ts b/sdk/storage/storage-internal-avro/src/AvroParser.ts new file mode 100644 index 000000000000..ef3216f15efa --- /dev/null +++ b/sdk/storage/storage-internal-avro/src/AvroParser.ts @@ -0,0 +1,357 @@ +import { IReadable } from "./IReadable"; +import { Dictionary, KeyValuePair } from "./utils/utils.common"; +import { AvroConstants } from "./AvroConstants"; + +export class AvroParser { + /** + * Reads a fixed number of bytes from the stream. + * + * @static + * @param stream + * @param length + */ + public static async readFixedBytes(stream: IReadable, length: number): Promise { + const bytes = await stream.read(length); + if (bytes.length != length) { + throw new Error("Hit stream end.") + } + return bytes; + } + + /** + * Reads a single byte from the stream. + * + * @static + * @param stream + */ + private static async readByte(stream: IReadable): Promise { + const buf = await this.readFixedBytes(stream, 1); + return buf[0]; + } + + private static async readZigZagLong(stream: IReadable): Promise { + // copied from https://github.com/apache/avro/blob/master/lang/js/lib/utils.js#L321 + let n = 0; + let k = 0; + let b, h, f, fk; + + do { + b = await this.readByte(stream); + h = b & 0x80; + n |= (b & 0x7f) << k; + k += 7; + } while (h && k < 28); + + if (h) { + // Switch to float arithmetic, otherwise we might overflow. + f = n; + fk = 268435456; // 2 ** 28. + do { + b = await this.readByte(stream); + f += (b & 0x7f) * fk; + fk *= 128; + } while (b & 0x80); + return (f % 2 ? -(f + 1) : f) / 2; + } + + return (n >> 1) ^ -(n & 1); + } + + public static async readLong(stream: IReadable): Promise { + return this.readZigZagLong(stream); + } + + public static async readInt(stream: IReadable): Promise { + return this.readZigZagLong(stream); + } + + public static async readNull(): Promise { + return null; + } + + public static async readBoolean(stream: IReadable): Promise { + const b = await this.readByte(stream); + if (b == 1) { + return true; + } else if (b == 0) { + return false; + } else { + throw new Error("Byte was not a boolean."); + } + } + + public static async readFloat(stream: IReadable): Promise { + const u8arr = await this.readFixedBytes(stream, 4); + const view = new DataView(u8arr.buffer); + return view.getFloat32(0, true); // littleEndian = true + } + + public static async readDouble(stream: IReadable): Promise { + const u8arr = await this.readFixedBytes(stream, 8); + const view = new DataView(u8arr.buffer); + return view.getFloat64(0, true); // littleEndian = true + } + + public static async readBytes(stream: IReadable): Promise { + const size = await this.readLong(stream); + if (size < 0) { + throw new Error("Bytes size was negative."); + } + + return await stream.read(size); + } + + public static async readString(stream: IReadable): Promise { + const u8arr = await this.readBytes(stream); + + // FIXME: need TextDecoder polyfill for IE + let utf8decoder = new TextDecoder(); + return utf8decoder.decode(u8arr); + } + + private static async readMapPair( + stream: IReadable, + readItemMethod: (s: IReadable) => Promise + ): Promise> { + const key = await this.readString(stream); + // FIXME: what about readFixed which need a length as parameter. + const value = await readItemMethod(stream); + return { key, value }; + } + + public static async readMap( + stream: IReadable, + readItemMethod: (s: IReadable) => Promise + ): Promise> { + const readPairMethod = async (stream: IReadable): Promise> => { + return await this.readMapPair(stream, readItemMethod); + } + + const pairs: KeyValuePair[] = await this.readArray(stream, readPairMethod); + let dict: Dictionary = {}; + for (const pair of pairs) { + dict[pair.key] = pair.value; + } + return dict; + } + + private static async readArray( + stream: IReadable, + readItemMethod: (s: IReadable) => Promise + ): Promise { + let items: T[] = []; + for (let count = await this.readLong(stream); + count != 0; + count = await this.readLong(stream)) { + if (count < 0) { + // Ignore block sizes + await this.readLong(stream); + count = - count; + } + + while (count--) { + const item: T = await readItemMethod(stream); + items.push(item); + } + } + return items; + } +} + + +interface RecordField { + name: string; + type: string | ObjectSchema | (string | ObjectSchema)[]; // Unions may not immediately contain other unions. +} + +interface ObjectSchema { + type: 'record' | 'enum' | 'array' | 'map' | 'fixed'; + name?: string; + aliases?: string; + fields?: RecordField[]; + symbols?: string[]; + values?: string; + size?: number; +}; + +export abstract class AvroType { + /** + * Reads an object from the stream. + * + * @param stream + */ + public abstract read(stream: IReadable): Promise; + + /** + * Determinds the AvroType from the Avro Schema. + */ + public static fromSchema(schema: string | Object): AvroType { + if (typeof schema == 'string') { + return this.fromStringSchema(schema); + } else if (Array.isArray(schema)) { + return this.fromArraySchema(schema); + } else { + return this.fromObjectSchema(schema as ObjectSchema); + } + } + + private static fromStringSchema(schema: string): AvroType { + // FIXME: simpler way to tell if schema is of type AvroPrimitive? + switch (schema) { + case AvroConstants.NULL: + case AvroConstants.BOOLEAN: + case AvroConstants.INT: + case AvroConstants.LONG: + case AvroConstants.FLOAT: + case AvroConstants.DOUBLE: + case AvroConstants.BYTES: + case AvroConstants.STRING: return new AvroPrimitiveType(schema as AvroPrimitive); + default: throw new Error(`Unexpected Avro type ${schema}`); + } + } + + private static fromArraySchema(schema: any[]): AvroType { + return new AvroUnionType(schema.map(this.fromSchema)); + } + + private static fromObjectSchema(schema: ObjectSchema): AvroType { + const type = schema.type; + // Primitives can be defined as strings or objects + try { + return this.fromStringSchema(type); + } catch (err) { } + + switch (type) { + case AvroConstants.RECORD: + if (schema.aliases) { + throw new Error(`aliases currently is not supported, schema: ${schema}`); + } + if (!schema.name) { + throw new Error(`Required attribute 'name' doesn't exist on schema: ${schema}`); + } + + let fields: Dictionary = {}; + if (!schema.fields) { + throw new Error(`Required attribute 'fields' doesn't exist on schema: ${schema}`); + } + for (const field of schema.fields) { + fields[field.name] = this.fromSchema(field.type); + } + return new AvroRecordType(fields, schema.name); + case AvroConstants.ENUM: + if (schema.aliases) { + throw new Error(`aliases currently is not supported, schema: ${schema}`); + } + if (!schema.symbols) { + throw new Error(`Required attribute 'symbols' doesn't exist on schema: ${schema}`); + } + return new AvroEnumType(schema.symbols); + case AvroConstants.MAP: + if (!schema.values) { + throw new Error(`Required attribute 'values' doesn't exist on schema: ${schema}`); + } + return new AvroMapType(this.fromSchema(schema.values)); + case AvroConstants.ARRAY: // Unused today + case AvroConstants.UNION: // Unused today + case AvroConstants.FIXED: // Unused today + default: + throw new Error(`Unexpected Avro type ${type} in ${schema}`); + } + } +} + + +type AvroPrimitive = 'null' | 'boolean' | 'int ' | 'long' | 'float' | 'double' | 'bytes' | 'string'; + +class AvroPrimitiveType extends AvroType { + private _primitive: AvroPrimitive; + + constructor(primitive: AvroPrimitive) { + super(); + this._primitive = primitive; + } + + public async read(stream: IReadable): Promise { + switch (this._primitive) { + case AvroConstants.NULL: return await AvroParser.readNull(); + case AvroConstants.BOOLEAN: return await AvroParser.readBoolean(stream); + case AvroConstants.INT: return await AvroParser.readInt(stream); + case AvroConstants.LONG: return await AvroParser.readLong(stream); + case AvroConstants.FLOAT: return await AvroParser.readFloat(stream); + case AvroConstants.DOUBLE: return await AvroParser.readDouble(stream); + case AvroConstants.BYTES: return await AvroParser.readBytes(stream); + case AvroConstants.STRING: return await AvroParser.readString(stream); + default: throw new Error("Unknown Avro Primitive"); + } + } +} + +class AvroEnumType extends AvroType { + private readonly _symbols: string[]; + + constructor(symbols: string[]) { + super(); + this._symbols = symbols; + } + + public async read(stream: IReadable): Promise { + const value = await AvroParser.readInt(stream); + return this._symbols[value]; + } +} + + +class AvroUnionType extends AvroType { + private readonly _types: AvroType[]; + + constructor(types: AvroType[]) { + super(); + this._types = types; + } + + public async read(stream: IReadable): Promise { + const typeIndex = await AvroParser.readInt(stream); + return await this._types[typeIndex].read(stream); + } +} + + +class AvroMapType extends AvroType { + private readonly _itemType: AvroType; + + constructor(itemType: AvroType) { + super(); + this._itemType = itemType; + } + + public async read(stream: IReadable): Promise { + const readItemMethod = async (s: IReadable): Promise => { + return await this._itemType.read(s); + } + return await AvroParser.readMap(stream, readItemMethod); + } +} + + +class AvroRecordType extends AvroType { + private readonly _name: string; + private readonly _fields: Dictionary; + + constructor(fields: Dictionary, name: string) { + super(); + this._fields = fields; + this._name = name; + } + + public async read(stream: IReadable): Promise { + let record: Dictionary = {}; + // FIXME: what for? + record["$schema"] = this._name; + for (const key in this._fields) { + if (this._fields.hasOwnProperty(key)) { + record[key] = await this._fields[key].read(stream); + } + } + return record; + } +} diff --git a/sdk/storage/storage-internal-avro/src/AvroReader.ts b/sdk/storage/storage-internal-avro/src/AvroReader.ts index 160f0ee320a4..42a99e1cf4c2 100644 --- a/sdk/storage/storage-internal-avro/src/AvroReader.ts +++ b/sdk/storage/storage-internal-avro/src/AvroReader.ts @@ -1,11 +1,7 @@ import { IReadable } from "./IReadable"; - -interface Metadata { - /** - * A name-value pair. - */ - [propertyName: string]: string; -} +import { AvroConstants } from "./AvroConstants"; +import { Metadata, arraysEqual } from "./utils/utils.common"; +import { AvroType, AvroParser } from "./AvroParser" export class AvroReader { private readonly _dataStream: IReadable; @@ -33,19 +29,19 @@ export class AvroReader { private _initialized: boolean; constructor( - dataStream: Readable + dataStream: IReadable ); constructor( - dataStream: Readable, - headerStream: Readable, + dataStream: IReadable, + headerStream: IReadable, currentBlockOffset: number, indexWithinCurrentBlock: number ); constructor( - dataStream: Readable, - headerStream?: Readable, + dataStream: IReadable, + headerStream?: IReadable, currentBlockOffset?: number, indexWithinCurrentBlock?: number ) { @@ -56,10 +52,10 @@ export class AvroReader { this._objectIndex = indexWithinCurrentBlock || 0; } - // TODO: cancellation / aborter? + // FUTURE: cancellation / aborter? private async initialize() { - const header = await AvroParser.readFixedBytes(this._headerStream, AvroConstants.InitBytes.Length); - if (header == AvroConstants.InitBytes) { + const header = await AvroParser.readFixedBytes(this._headerStream, AvroConstants.INIT_BYTES.length); + if (!arraysEqual(header, AvroConstants.INIT_BYTES)) { throw new Error("Stream is not an Avro file."); } @@ -68,33 +64,30 @@ export class AvroReader { this._metadata = await AvroParser.readMap(this._headerStream, AvroParser.readString); // Validate codec - const codec = this._metadata![AvroConstants.CodecKey]; + const codec = this._metadata![AvroConstants.CODEC_KEY]; if (!(codec == undefined || codec == "null")) { throw new Error("Codecs are not supported"); } // The 16-byte, randomly-generated sync marker for this file. - this._syncMarker = await AvroParser.readFixedBytes(this._headerStream, AvroConstants.SyncMarkerSize); + this._syncMarker = await AvroParser.readFixedBytes(this._headerStream, AvroConstants.SYNC_MARKER_SIZE); // Parse the schema - const schema = JSON.parse(this._metadata![AvroConstants.SchemaKey]); + const schema = JSON.parse(this._metadata![AvroConstants.SCHEMA_KEY]); this._itemType = AvroType.fromSchema(schema.RootElement); if (this._blockOffset == 0) { - this._blockOffset = this._dataStream. + this._blockOffset = this._dataStream.position; } - // Populate _itemsRemainingInCurrentBlock this._itemsRemainingInBlock = await AvroParser.readLong(this._dataStream); - // skip block length await AvroParser.readLong(this._dataStream); this._initialized = true; - if (this._objectIndex && this._objectIndex > 0) { for (let i = 0; i < this._objectIndex; i++) { - await this._itemType.Read(this._dataStream); + await this._itemType.read(this._dataStream); this._itemsRemainingInBlock!--; } } @@ -104,13 +97,13 @@ export class AvroReader { return !this._initialized || this._itemsRemainingInBlock! > 0; } - public async *parseObjects(): AsyncIterableIterator { + public async *parseObjects(): AsyncIterableIterator { if (!this._initialized) { await this.initialize(); } while (this.hasNext) { - const result = await this._itemType.read(this._dataStream); + const result = await this._itemType!.read(this._dataStream); this._itemsRemainingInBlock!--; this._objectIndex!++; @@ -118,10 +111,10 @@ export class AvroReader { if (this._itemsRemainingInBlock == 0) { const marker = await AvroParser.readFixedBytes(this._dataStream, 16); - BlockOffset = _dataStream.Position; + this._blockOffset = this._dataStream.position; this._objectIndex = 0; - if (!this._syncMarker == marker) { + if (!arraysEqual(this._syncMarker!, marker)) { throw new Error("Stream is not a valid Avro file."); } diff --git a/sdk/storage/storage-internal-avro/src/IReadable.ts b/sdk/storage/storage-internal-avro/src/IReadable.ts index 5f76a9016387..d54e323bf88e 100644 --- a/sdk/storage/storage-internal-avro/src/IReadable.ts +++ b/sdk/storage/storage-internal-avro/src/IReadable.ts @@ -30,10 +30,14 @@ export class ReadableFromStream extends IReadable { } public async read(size: number): Promise { + if (size <= 0) { + throw new Error(`size parameter should be positive: ${size}`); + } + // readable is true if it is safe to call readable.read(), which means the stream has not been destroyed or emitted 'error' or 'end'. // if (!this._stillReadable || this._readable.destroyed) { if (!this._readable.readable) { - throw Error("Stream no longer readable."); + throw new Error("Stream no longer readable."); } // See if there is already enough data, note that "Only after readable.read() returns null, 'readable' will be emitted." @@ -69,14 +73,15 @@ import * as fs from "fs"; async function main() { let rs = fs.createReadStream("README.md"); + console.log(rs.read(0)); let rfs = new ReadableFromStream(rs); console.log(rfs.position); + console.log(rs.readable); const buf = await rfs.read(10); console.log(buf.toString()); console.log(rs.readable); - console.log(rs.readableLength); const buf2 = await rfs.read(100000); console.log(buf2.toString()); diff --git a/sdk/storage/storage-internal-avro/src/index.ts b/sdk/storage/storage-internal-avro/src/index.ts new file mode 100644 index 000000000000..71e41f76c2ed --- /dev/null +++ b/sdk/storage/storage-internal-avro/src/index.ts @@ -0,0 +1,3 @@ +import { AvroReader } from "./AvroReader"; +import { IReadable } from "./IReadable"; +export { AvroReader, IReadable } diff --git a/sdk/storage/storage-internal-avro/src/utils/utils.common.ts b/sdk/storage/storage-internal-avro/src/utils/utils.common.ts new file mode 100644 index 000000000000..d561e54e7166 --- /dev/null +++ b/sdk/storage/storage-internal-avro/src/utils/utils.common.ts @@ -0,0 +1,26 @@ +export interface Metadata { + /** + * A name-value pair. + */ + [propertyName: string]: string; +} + +export interface Dictionary { + [key: string]: T; +} + +export interface KeyValuePair { + key: string; + value: T; +} + +export function arraysEqual(a: Uint8Array, b : Uint8Array) { + if (a === b) return true; + if (a == null || b == null) return false; + if (a.length != b.length) return false; + + for (let i = 0; i < a.length; ++i) { + if (a[i] !== b[i]) return false; + } + return true; +} \ No newline at end of file