From 62cd3644ab15871514de5582b7118dbdf9def281 Mon Sep 17 00:00:00 2001 From: Sean McCullough Date: Tue, 21 Apr 2020 14:55:16 -0700 Subject: [PATCH 1/4] Added Avro parser --- .../Azure.Storage.Internal.Avro/README.md | 20 + .../src/AssemblyInfo.cs | 12 + .../src/AvroConstants.cs | 24 + .../src/AvroParser.cs | 434 ++++++++++++++++++ .../src/AvroReader.cs | 196 ++++++++ .../src/Azure.Storage.Internal.Avro.csproj | 21 + .../Azure.Storage.Internal.Avro/src/Record.cs | 13 + .../tests/AvroReaderTests.cs | 72 +++ .../Azure.Storage.Internal.Avro.Tests.csproj | 17 + .../tests/Resources/test_null_0.avro | Bin 0 -> 75 bytes .../tests/Resources/test_null_1.avro | Bin 0 -> 88 bytes .../tests/Resources/test_null_10.avro | Bin 0 -> 153 bytes .../tests/Resources/test_null_11.avro | Bin 0 -> 213 bytes .../tests/Resources/test_null_12.avro | Bin 0 -> 105 bytes .../tests/Resources/test_null_13.avro | Bin 0 -> 157 bytes .../tests/Resources/test_null_14.avro | Bin 0 -> 358 bytes .../tests/Resources/test_null_2.avro | Bin 0 -> 308 bytes .../tests/Resources/test_null_3.avro | Bin 0 -> 177 bytes .../tests/Resources/test_null_4.avro | Bin 0 -> 94 bytes .../tests/Resources/test_null_5.avro | Bin 0 -> 95 bytes .../tests/Resources/test_null_6.avro | Bin 0 -> 116 bytes .../tests/Resources/test_null_7.avro | Bin 0 -> 158 bytes .../tests/Resources/test_null_8.avro | Bin 0 -> 123 bytes .../tests/Resources/test_null_9.avro | Bin 0 -> 134 bytes sdk/storage/Azure.Storage.sln | 12 + 25 files changed, 821 insertions(+) create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/README.md create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/src/AssemblyInfo.cs create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/src/Azure.Storage.Internal.Avro.csproj create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/src/Record.cs create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Azure.Storage.Internal.Avro.Tests.csproj create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_0.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_1.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_10.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_11.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_12.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_13.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_14.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_2.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_3.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_4.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_5.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_6.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_7.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_8.avro create mode 100644 sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_9.avro diff --git a/sdk/storage/Azure.Storage.Internal.Avro/README.md b/sdk/storage/Azure.Storage.Internal.Avro/README.md new file mode 100644 index 0000000000000..b12a11b21c46e --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/README.md @@ -0,0 +1,20 @@ +# Azure Storage Internal Avro client library for .NET +- For internal use only. + +## Getting started +- For internal use only. + +## Key concepts +- For internal use only. + +## Examples +- For internal use only. + +## Troubleshooting +- For internal use only. + +## Next steps +- For internal use only. + +## Contributing +- For internal use only. \ No newline at end of file diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AssemblyInfo.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AssemblyInfo.cs new file mode 100644 index 0000000000000..114053a636f6c --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AssemblyInfo.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Azure.Storage.Internal.Avro.Tests, PublicKey=" + + "0024000004800000940000000602000000240000525341310004000001000100d15ddcb2968829" + + "5338af4b7686603fe614abd555e09efba8fb88ee09e1f7b1ccaeed2e8f823fa9eef3fdd60217fc" + + "012ea67d2479751a0b8c087a4185541b851bd8b16f8d91b840e51b1cb0ba6fe647997e57429265" + + "e85ef62d565db50a69ae1647d54d7bd855e4db3d8a91510e5bcbd0edfbbecaa20a7bd9ae74593d" + + "aa7b11b4")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")] diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs new file mode 100644 index 0000000000000..ca3d23a201ca3 --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Storage.Internal.Avro +{ + internal class AvroConstants + { + public const int SyncMarkerSize = 16; + public static byte[] InitBytes = + { + (byte)'O', + (byte)'b', + (byte)'j', + (byte)1 + }; + public const string CodecKey = "avro.codec"; + public const string SchemaKey = "avro.schema"; + public const string DeflateCodec = "deflate"; + } +} diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs new file mode 100644 index 0000000000000..c8d28213eef66 --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs @@ -0,0 +1,434 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core.Pipeline; + +#pragma warning disable SA1402 // File may only contain a single type + +namespace Azure.Storage.Internal.Avro +{ + internal static class AvroParser + { + public static List Parse(Stream stream, CancellationToken cancellationToken = default) => + ReadObjectContainerFileAsync(stream, async: false, cancellationToken).EnsureCompleted(); + + public static async Task> ParseAsync(Stream stream, CancellationToken cancellationToken = default) => + await ReadObjectContainerFileAsync(stream, async: true, cancellationToken).ConfigureAwait(false); + + private static async Task> ReadObjectContainerFileAsync( + Stream stream, + bool async, + CancellationToken cancellationToken = default) + { + // Four bytes, ASCII 'O', 'b', 'j', followed by 1. + byte[] header = await ReadFixedBytesAsync(stream, AvroConstants.InitBytes.Length, async, cancellationToken).ConfigureAwait(false); + Debug.Assert(header[0] == AvroConstants.InitBytes[0]); + Debug.Assert(header[1] == AvroConstants.InitBytes[1]); + Debug.Assert(header[2] == AvroConstants.InitBytes[2]); + Debug.Assert(header[3] == AvroConstants.InitBytes[3]); + + // File metadata is written as if defined by the following map schema: + // { "type": "map", "values": "bytes"} + Dictionary metadata = await ReadMapAsync(stream, ReadStringAsync, async, cancellationToken).ConfigureAwait(false); + Debug.Assert(metadata[AvroConstants.CodecKey] == "null"); + + // The 16-byte, randomly-generated sync marker for this file. + byte[] syncMarker = await ReadFixedBytesAsync(stream, AvroConstants.SyncMarkerSize, async, cancellationToken).ConfigureAwait(false); + + // Parse the schema + using JsonDocument schema = JsonDocument.Parse(metadata[AvroConstants.SchemaKey]); + AvroType itemType = AvroType.FromSchema(schema.RootElement); + + // File data blocks + var data = new List(); + while (stream.Position < stream.Length) + { + long length = await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false); + await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false); // Ignore the block size + while (length-- > 0) + { + object value = await itemType.ReadAsync(stream, async, cancellationToken).ConfigureAwait(false); + data.Add(value); + } + await ReadFixedBytesAsync(stream, AvroConstants.SyncMarkerSize, async, cancellationToken).ConfigureAwait(false); // Ignore the sync check + } + return data; + } + + public static async Task ReadFixedBytesAsync( + Stream stream, + int length, + bool async, + CancellationToken cancellationToken) + { + byte[] data = new byte[length]; + int start = 0; + while (length > 0) + { + int n = async ? + await stream.ReadAsync(data, start, length, cancellationToken).ConfigureAwait(false) : + stream.Read(data, start, length); + start += n; + length -= n; + + // We hit the end of the stream + if (n <= 0) + return data; + } + return data; + } + + private static async Task ReadByteAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + byte[] bytes = await ReadFixedBytesAsync(stream, 1, async, cancellationToken).ConfigureAwait(false); + return bytes[0]; + } + + // Stolen because the linked references in the Avro spec were subpar... + private static async Task ReadZigZagLongAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + byte b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false); + ulong next = b & 0x7FUL; + int shift = 7; + while ((b & 0x80) != 0) + { + b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false); + next |= (b & 0x7FUL) << shift; + shift += 7; + } + long value = (long)next; + return (-(value & 0x01L)) ^ ((value >> 1) & 0x7fffffffffffffffL); + } + + public static Task ReadNullAsync() => Task.FromResult(null); + + public static async Task ReadBoolAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + byte b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false); + return b != 0; + } + + public static async Task ReadLongAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) => + await ReadZigZagLongAsync(stream, async, cancellationToken).ConfigureAwait(false); + + public static async Task ReadIntAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) => + (int)(await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false)); + + public static async Task ReadFloatAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + byte[] bytes = await ReadFixedBytesAsync(stream, 4, async, cancellationToken).ConfigureAwait(false); + return BitConverter.ToSingle(bytes, 0); + } + + public static async Task ReadDoubleAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + byte[] bytes = await ReadFixedBytesAsync(stream, 8, async, cancellationToken).ConfigureAwait(false); + return BitConverter.ToDouble(bytes, 0); + } + + public static async Task ReadBytesAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + int size = await ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false); + return await ReadFixedBytesAsync(stream, size, async, cancellationToken).ConfigureAwait(false); + } + + public static async Task ReadStringAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + byte[] bytes = await ReadBytesAsync(stream, async, cancellationToken).ConfigureAwait(false); + return Encoding.UTF8.GetString(bytes); + } + + private static async Task> ReadMapPairAsync( + Stream stream, + Func> parseItemAsync, + bool async, + CancellationToken cancellationToken) + { + string key = await ReadStringAsync(stream, async, cancellationToken).ConfigureAwait(false); + #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope. + T value = await parseItemAsync(stream, async, cancellationToken).ConfigureAwait(false); + #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope. + return new KeyValuePair(key, value); + } + + public static async Task> ReadMapAsync( + Stream stream, + Func> parseItemAsync, + bool async, + CancellationToken cancellationToken) + { + #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope. + #pragma warning disable AZC0108 // Incorrect 'async' parameter value. + Func>> parsePair = + async (s, a, c) => await ReadMapPairAsync(s, parseItemAsync, a, c).ConfigureAwait(false); + #pragma warning restore AZC0108 // Incorrect 'async' parameter value. + #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope. + IEnumerable> entries = + await ReadArrayAsync(stream, parsePair, async, cancellationToken).ConfigureAwait(false); + return entries.ToDictionary(); + } + + private static async Task> ReadArrayAsync( + Stream stream, + Func> parseItemAsync, + bool async, + CancellationToken cancellationToken) + { + // TODO: This is unpleasant, but I don't want to switch everything to IAsyncEnumerable for every array + List items = new List(); + for (long length = await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false); + length != 0; + length = await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false)) + { + // Ignore block sizes because we're not skipping anything + if (length < 0) + { + await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false); + length = -length; + } + while (length-- > 0) + { + #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope. + T item = await parseItemAsync(stream, async, cancellationToken).ConfigureAwait(false); + #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope. + items.Add(item); + }; + } + return items; + } + + internal static List Map(this JsonElement array, Func selector) + { + var values = new List(); + foreach (JsonElement element in array.EnumerateArray()) + { + values.Add(selector(element)); + } + return values; + } + + internal static Dictionary ToDictionary(this IEnumerable> values) + { + Dictionary dict = new Dictionary(); + foreach (KeyValuePair pair in values) + { + dict[pair.Key] = pair.Value; + } + return dict; + } + } + + internal abstract class AvroType + { + public abstract Task ReadAsync( + Stream stream, + bool async, + CancellationToken cancellationToken); + + public static AvroType FromSchema(JsonElement schema) + { + switch (schema.ValueKind) + { + // Primitives + case JsonValueKind.String: + { + string type = schema.GetString(); + switch (type) + { + case "null": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Null }; + case "boolean": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean }; + case "int": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Int }; + case "long": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Long }; + case "float": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Float }; + case "double": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Double }; + case "bytes": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes }; + case "string": + return new AvroPrimitiveType { Primitive = AvroPrimitive.String }; + default: + throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"); + } + } + // Union types + case JsonValueKind.Array: + return new AvroUnionType { Types = schema.Map(FromSchema) }; + // Everything else + case JsonValueKind.Object: + { + string type = schema.GetProperty("type").GetString(); + switch (type) + { + // Primitives can be defined as strings or objects + case "null": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Null }; + case "boolean": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean }; + case "int": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Int }; + case "long": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Long }; + case "float": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Float }; + case "double": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Double }; + case "bytes": + return new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes }; + case "string": + return new AvroPrimitiveType { Primitive = AvroPrimitive.String }; + case "record": + if (schema.TryGetProperty("aliases", out var _)) throw new InvalidOperationException($"Unexpected aliases on {schema}"); + string name = schema.GetProperty("name").GetString(); + Dictionary fields = new Dictionary(); + foreach (JsonElement field in schema.GetProperty("fields").EnumerateArray()) + { + fields[field.GetProperty("name").GetString()] = FromSchema(field.GetProperty("type")); + } + return new AvroRecordType { Schema = name, Fields = fields }; + case "enum": + if (schema.TryGetProperty("aliases", out var _)) throw new InvalidOperationException($"Unexpected aliases on {schema}"); + return new AvroEnumType { Symbols = schema.GetProperty("symbols").Map(s => s.GetString()) }; + case "map": + return new AvroMapType { ItemType = FromSchema(schema.GetProperty("values")) }; + case "array": // Unused today + case "union": // Unused today + case "fixed": // Unused today + default: + throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"); + } + } + default: + throw new InvalidOperationException($"Unexpected JSON Element: {schema}"); + } + } + } + + internal enum AvroPrimitive { Null, Boolean, Int, Long, Float, Double, Bytes, String }; + + internal class AvroPrimitiveType : AvroType + { + public AvroPrimitive Primitive { get; set; } + + public override async Task ReadAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) => + Primitive switch + { + #pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope. + AvroPrimitive.Null => await AvroParser.ReadNullAsync().ConfigureAwait(false), + #pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope. + AvroPrimitive.Boolean => await AvroParser.ReadBoolAsync(stream, async, cancellationToken).ConfigureAwait(false), + AvroPrimitive.Int => await AvroParser.ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false), + AvroPrimitive.Long => await AvroParser.ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false), + AvroPrimitive.Float => await AvroParser.ReadFloatAsync(stream, async, cancellationToken).ConfigureAwait(false), + AvroPrimitive.Double => await AvroParser.ReadDoubleAsync(stream, async, cancellationToken).ConfigureAwait(false), + AvroPrimitive.Bytes => await AvroParser.ReadBytesAsync(stream, async, cancellationToken).ConfigureAwait(false), + AvroPrimitive.String => await AvroParser.ReadStringAsync(stream, async, cancellationToken).ConfigureAwait(false), + _ => throw new InvalidOperationException("Unknown Avro Primitive!") + }; + } + + internal class AvroEnumType : AvroType + { + public IReadOnlyList Symbols { get; set; } + + public override async Task ReadAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + int value = await AvroParser.ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false); + return Symbols[value]; + } + } + + internal class AvroUnionType : AvroType + { + public IReadOnlyList Types { get; set; } + + public override async Task ReadAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + int option = await AvroParser.ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false); + return await Types[option].ReadAsync(stream, async, cancellationToken).ConfigureAwait(false); + } + } + + internal class AvroMapType : AvroType + { + public AvroType ItemType { get; set; } + + public override async Task ReadAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + Func> parseItemAsync = + async (s, a, c) => await ItemType.ReadAsync(s, a, c).ConfigureAwait(false); + return await AvroParser.ReadMapAsync(stream, parseItemAsync, async, cancellationToken).ConfigureAwait(false); + } + } + + internal class AvroRecordType : AvroType + { + public string Schema { get; set; } + public IReadOnlyDictionary Fields { get; set; } + + public override async Task ReadAsync( + Stream stream, + bool async, + CancellationToken cancellationToken) + { + Dictionary record = new Dictionary(); + record["$schema"] = Schema; + foreach (KeyValuePair field in Fields) + { + record[field.Key] = await field.Value.ReadAsync(stream, async, cancellationToken).ConfigureAwait(false); + } + return record; + } + } +} diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs new file mode 100644 index 0000000000000..a3f89bbd0c842 --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs @@ -0,0 +1,196 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using System.Text.Json; +using System.Threading; + +namespace Azure.Storage.Internal.Avro +{ + internal class AvroReader + { + /// + /// Stream containing the body of the Avro file. + /// + private readonly Stream _dataStream; + + /// + /// Stream containing the header of the Avro file. + /// + private readonly Stream _headerStream; + + /// + /// Sync marker. + /// + private byte[] _syncMarker; + + /// + /// Avro metadata. + /// + private Dictionary _metadata; + + /// + /// Avro schema. + /// + private AvroType _itemType; + + /// + /// The number of items remaining in the current block. + /// + private long _itemsRemainingInBlock; + + /// + /// The byte offset within the Avro file (both header and data) + /// of the start of the current block. + /// + public long BlockOffset { get; private set; } + + /// + /// The index of the current object within the current block. + /// + /// + public long ObjectIndex { get; private set; } + + /// + /// If this Avro Reader has been initalized. + /// + private bool _initalized; + + /// + /// Constructor for an AvroReader that will read from the + /// beginning of an Avro file. + /// + public AvroReader(Stream dataStream) + { + _dataStream = dataStream; + _headerStream = dataStream; + _metadata = new Dictionary(); + _initalized = false; + } + + /// + /// Constructor for an Avro Reader that will read beginning + /// in the middle of an Avro file. + /// + public AvroReader( + Stream dataStream, + Stream headerStream, + long currentBlockOffset, + long indexWithinCurrentBlock) + { + _dataStream = dataStream; + _headerStream = headerStream; + _metadata = new Dictionary(); + _initalized = false; + BlockOffset = currentBlockOffset; + ObjectIndex = indexWithinCurrentBlock; + _initalized = false; + } + + private async Task Initalize(bool async, CancellationToken cancellationToken = default) + { + // Four bytes, ASCII 'O', 'b', 'j', followed by 1. + byte[] header = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.InitBytes.Length, async, cancellationToken).ConfigureAwait(false); + if (!header.SequenceEqual(AvroConstants.InitBytes)) + { + throw new ArgumentException("Stream is not an Avro file."); + } + + // File metadata is written as if defined by the following map schema: + // { "type": "map", "values": "bytes"} + _metadata = await AvroParser.ReadMapAsync(_headerStream, AvroParser.ReadStringAsync, async, cancellationToken).ConfigureAwait(false); + + // Validate codec + _metadata.TryGetValue(AvroConstants.CodecKey, out string codec); + if (codec == AvroConstants.DeflateCodec) + { + throw new ArgumentException("Deflate codec is not supported"); + } + + // The 16-byte, randomly-generated sync marker for this file. + _syncMarker = await AvroParser.ReadFixedBytesAsync(_headerStream, AvroConstants.SyncMarkerSize, async, cancellationToken).ConfigureAwait(false); + + // Parse the schema + using JsonDocument schema = JsonDocument.Parse(_metadata[AvroConstants.SchemaKey]); + _itemType = AvroType.FromSchema(schema.RootElement); + + if (BlockOffset == 0) + { + BlockOffset = _dataStream.Position; + } + + // Populate _itemsRemainingInCurrentBlock + _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); + + // skip block length + await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); + + _initalized = true; + + if (ObjectIndex > 0) + { + for (int i = 0; i < ObjectIndex; i++) + { + await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); + _itemsRemainingInBlock--; + } + } + } + + public bool HasNext() => !_initalized || _itemsRemainingInBlock > 0; + + public async Task Next(bool async, CancellationToken cancellationToken = default) + { + // Initialize AvroReader, if necessary. + if (!_initalized) + { + await Initalize(async, cancellationToken).ConfigureAwait(false); + } + + if (!HasNext()) + { + throw new ArgumentException("There are no more items in the stream"); + } + + + object result = await _itemType.ReadAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); + + _itemsRemainingInBlock--; + ObjectIndex++; + + if (_itemsRemainingInBlock == 0) + { + byte[] marker = await AvroParser.ReadFixedBytesAsync(_dataStream, 16, async, cancellationToken).ConfigureAwait(false); + + BlockOffset = _dataStream.Position; + ObjectIndex = 0; + + if (!_syncMarker.SequenceEqual(marker)) + { + throw new ArgumentException("Stream is not a valid Avro file."); + } + + try + { + _itemsRemainingInBlock = await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); + } + catch (InvalidOperationException) + { + // We hit the end of the stream. + } + + if (_itemsRemainingInBlock > 0) + { + // Ignore block size + await AvroParser.ReadLongAsync(_dataStream, async, cancellationToken).ConfigureAwait(false); + } + } + + return result; + } + } +} diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/Azure.Storage.Internal.Avro.csproj b/sdk/storage/Azure.Storage.Internal.Avro/src/Azure.Storage.Internal.Avro.csproj new file mode 100644 index 0000000000000..cb09c70284d34 --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/Azure.Storage.Internal.Avro.csproj @@ -0,0 +1,21 @@ + + + $(RequiredTargetFrameworks) + + + Microsoft Azure.Storage.Internal.Avro client library + 12.0.0-preview.1 + false + false + + + + + + + + + + + + \ No newline at end of file diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/Record.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/Record.cs new file mode 100644 index 0000000000000..8da2196679338 --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/Record.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Storage.Internal.Avro +{ + internal class Record + { + } +} diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs b/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs new file mode 100644 index 0000000000000..702830addceae --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Azure.Storage.Internal.Avro.Tests +{ + public class AvroReaderTests + { + [Test] + public async Task Tests() + { + List testCases = new List + { + new TestCase("test_null_0.avro", o => Assert.IsNull(o)), // null + new TestCase("test_null_1.avro", o => Assert.AreEqual(true, (bool)o)), // bool + new TestCase("test_null_2.avro", o => Assert.AreEqual("adsfasdf09809dsf-=adsf", (string)o)), // string + new TestCase("test_null_3.avro", o => Assert.AreEqual(Encoding.UTF8.GetBytes("12345abcd"), (byte[])o)), // byte[] + new TestCase("test_null_4.avro", o => Assert.AreEqual(1234, (int)o)), // int + new TestCase("test_null_5.avro", o => Assert.AreEqual(1234L, (long)o)), // long + new TestCase("test_null_6.avro", o => Assert.AreEqual(1234.0, (float)o)), // float + new TestCase("test_null_7.avro", o => Assert.AreEqual(1234.0, (double)o)), // fouble + // Not supported today. + //new TestCase("test_null_8.avro", o => Assert.AreEqual(Encoding.UTF8.GetBytes("B"), (byte[])o)), // fixed + new TestCase("test_null_9.avro", o => Assert.AreEqual("B", (string)o)), // enum + // Not supported today. + // new TestCase("test_null_10.avro", o => Assert.AreEqual(new List() { 1, 2, 3 }, (List)o)), // array + new TestCase("test_null_11.avro", o => Assert.AreEqual( + new Dictionary() { { "a", 1 }, { "b", 3 }, { "c", 2 } }, (Dictionary)o)), // dictionary + new TestCase("test_null_12.avro", o => Assert.IsNull(o)), // union + new TestCase("test_null_13.avro", o => // record + { + Dictionary expected = new Dictionary() { { "$schema", "Test" }, { "f", 5 } }; + Dictionary actual = (Dictionary)o; + Assert.AreEqual(expected.Count, actual.Count); + foreach (KeyValuePair keyValuePair in actual) + { + Assert.AreEqual(expected[keyValuePair.Key], keyValuePair.Value); + } + }) + }; + + foreach (TestCase testCase in testCases) + { + // Arrange + using FileStream stream = File.OpenRead($"Resources\\{testCase.Path}"); + AvroReader avroReader = new AvroReader(stream); + + // Act + object o = await avroReader.Next(async: true).ConfigureAwait(false); + testCase.Predicate(o); + } + } + + private class TestCase + { + public string Path; + public Action Predicate; + + public TestCase(string path, Action predicate) + { + Path = path; + Predicate = predicate; + } + } + } +} diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Azure.Storage.Internal.Avro.Tests.csproj b/sdk/storage/Azure.Storage.Internal.Avro/tests/Azure.Storage.Internal.Avro.Tests.csproj new file mode 100644 index 0000000000000..a5f0f11e662fb --- /dev/null +++ b/sdk/storage/Azure.Storage.Internal.Avro/tests/Azure.Storage.Internal.Avro.Tests.csproj @@ -0,0 +1,17 @@ + + + $(RequiredTargetFrameworks) + + + Microsoft Azure.Storage.Internal.Avro client library tests + false + + + + + + + PreserveNewest + + + \ No newline at end of file diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_0.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_0.avro new file mode 100644 index 0000000000000000000000000000000000000000..91c2b2469e5432eb9ec2390151bc9ff3e90ceaa0 GIT binary patch literal 75 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCd6YmRN(`NUQtMNu2+Y1`d^mr~ K=3Z$L3=seW)g9yj literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_1.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_1.avro new file mode 100644 index 0000000000000000000000000000000000000000..01371934eba3764a31d53dd3f9bc1e461702d1ab GIT binary patch literal 88 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCg_M%=^K()Y^OP9s8P`rr>EQjK TbNPM6yO#l9L_`?j09_6MP7xss literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_10.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_10.avro new file mode 100644 index 0000000000000000000000000000000000000000..97aaaa0bb91a78930168294a4d9b4a235d6dd92a GIT binary patch literal 153 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCU8@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCovM{eDhpDTtQ3@T6AP4d6qL#m zb4pW-K>|7XdFe{E49};o`JAs(f6}V2H`n@m#e0!EjBHGaOiW2^Ovx+^bYP-8005xm BHPQe8 literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_12.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_12.avro new file mode 100644 index 0000000000000000000000000000000000000000..ddf42625f4f320290e6da4136c343b800d139419 GIT binary patch literal 105 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCO`?^GONuh{(v@@+lt7XoIwv2< fk7c+TwR{ENa=xFFChYuieom>MhzJuLpvwUOZ!acI literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_13.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_13.avro new file mode 100644 index 0000000000000000000000000000000000000000..277376ae1aa5801191c8824813be8f020324fbae GIT binary patch literal 157 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCXE9bQl~fj_Dp@Hg6{RNU7o{la zC@AG6=7L2+Qj1GK{Itx}oRngqnrMXTocz3WWVLBZwXwAfaYu7sT72Vj$vnXPB|UzL Mf`|wg9H7eq0E1I9(f|Me literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_14.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_14.avro new file mode 100644 index 0000000000000000000000000000000000000000..3c34ec843837039174125c7b6d7b86554bd35374 GIT binary patch literal 358 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC=P_3+l~fj_Dp@Hg6{RNU7o{la zC@AG6=7L3hGK&j9{Itx}oRngqnrOJ{XeE$QAj#sAqRhN>APX*s#U$taykZ{B$gLMxCiYvk+gsN+Yg$~W$O1+dCXS5M M#1sZ<(dbqH03+gO_W%F@ literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_2.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_2.avro new file mode 100644 index 0000000000000000000000000000000000000000..bf119d9e16f55f99c06d203079cd7b35812f2744 GIT binary patch literal 308 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC1(b?QiZb)kl^8Z(S$pN@?6xH= opWCzEF8(bga)wzaF{L;yu{b5oz|z9N63EuI1&ItZVRVlJ04jEH;{X5v literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_3.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_3.avro new file mode 100644 index 0000000000000000000000000000000000000000..d542117f7f6e950c5858d2f5242d03db1142d117 GIT binary patch literal 177 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC`IM3>OHzxK7|d0}Qa-liyMB4a cZm;{t@4v_iMj=BZV-wTFq~sLZvCypr0FtpdUjP6A literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_4.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_4.avro new file mode 100644 index 0000000000000000000000000000000000000000..b514fd8218419e4dd2b5dfe9bda9f4b678a1581f GIT binary patch literal 94 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCxs)>VN|YGx{5Z8JF?q=kk$qY} QI;waJMKqQOV?uOQ0FbsOYybcN literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_5.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_5.avro new file mode 100644 index 0000000000000000000000000000000000000000..29e8ca4d5f3559887e1628238dff6d8499f315a1 GIT binary patch literal 95 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCd6aVU^U{?VDi@v#HHgi6KXqZr S;X}KQ-W1VTB8&;qRRI97RwwrW literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_6.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_6.avro new file mode 100644 index 0000000000000000000000000000000000000000..df22b0f901a3004e75a9a922eeaf6c61b942fa5f GIT binary patch literal 116 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC`IORf@)Jvx81%y!RhK2p%#Hf7 U#^f4vcdkePgTpKrVlcW+0GJvkApigX literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_7.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_7.avro new file mode 100644 index 0000000000000000000000000000000000000000..1168f99d0d1977ba6a446b5b4599a96efd6f072f GIT binary patch literal 158 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC1(Z_qOOtX^l^6>5J}b~GkAB(O YA*We={AY~F0!9W9@R;mCEgIbl0PIvMivR!s literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_8.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_8.avro new file mode 100644 index 0000000000000000000000000000000000000000..b4136af69b603bc2695bcdffdcd69e9abb650888 GIT binary patch literal 123 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCBdV23DhpDTtQ3^eGAmM3lynr7 u@)C2w0wJlzB_MurW)+BUSj#Y1s# literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_9.avro b/sdk/storage/Azure.Storage.Internal.Avro/tests/Resources/test_null_9.avro new file mode 100644 index 0000000000000000000000000000000000000000..90abc062240449b5df26a9aeeebf602cb9f38d73 GIT binary patch literal 134 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC^Qx6fDhpDTtQ3?|^Gb7-bQF~G z5_7@)kksN55Wl!GHz_}-7^oy#$q^*rq!e4rpyG4uSOkOUm+zatwOxp?(H9Y6f&+9p E09Ilw2LJ#7 literal 0 HcmV?d00001 diff --git a/sdk/storage/Azure.Storage.sln b/sdk/storage/Azure.Storage.sln index 79928d62bf579..bd5413ab22aab 100644 --- a/sdk/storage/Azure.Storage.sln +++ b/sdk/storage/Azure.Storage.sln @@ -115,6 +115,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ApiCompat", "..\..\eng\ApiC EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Storage.Blobs.Batch.Samples.Tests", "Azure.Storage.Blobs.Batch\samples\Azure.Storage.Blobs.Batch.Samples.Tests.csproj", "{73F575E6-FA87-40B0-9D36-6D9FAAC1E1C1}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Storage.Internal.Avro", "Azure.Storage.Internal.Avro\src\Azure.Storage.Internal.Avro.csproj", "{83C6A14C-98D1-46EA-BBB6-BE1D783BAEC4}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Storage.Internal.Avro.Tests", "Azure.Storage.Internal.Avro\tests\Azure.Storage.Internal.Avro.Tests.csproj", "{A7FEC0AC-9A90-4F12-A260-B0B63E57D9DA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -209,6 +213,14 @@ Global {73F575E6-FA87-40B0-9D36-6D9FAAC1E1C1}.Debug|Any CPU.Build.0 = Debug|Any CPU {73F575E6-FA87-40B0-9D36-6D9FAAC1E1C1}.Release|Any CPU.ActiveCfg = Release|Any CPU {73F575E6-FA87-40B0-9D36-6D9FAAC1E1C1}.Release|Any CPU.Build.0 = Release|Any CPU + {83C6A14C-98D1-46EA-BBB6-BE1D783BAEC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {83C6A14C-98D1-46EA-BBB6-BE1D783BAEC4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {83C6A14C-98D1-46EA-BBB6-BE1D783BAEC4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {83C6A14C-98D1-46EA-BBB6-BE1D783BAEC4}.Release|Any CPU.Build.0 = Release|Any CPU + {A7FEC0AC-9A90-4F12-A260-B0B63E57D9DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A7FEC0AC-9A90-4F12-A260-B0B63E57D9DA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A7FEC0AC-9A90-4F12-A260-B0B63E57D9DA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A7FEC0AC-9A90-4F12-A260-B0B63E57D9DA}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE From 33f40d876c54865b884562e52a69e250ff9243eb Mon Sep 17 00:00:00 2001 From: Sean McCullough Date: Tue, 21 Apr 2020 14:58:35 -0700 Subject: [PATCH 2/4] removed some unnecessary stuff --- .../src/AvroParser.cs | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs index c8d28213eef66..607dabfba15c5 100644 --- a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs @@ -17,52 +17,6 @@ namespace Azure.Storage.Internal.Avro { internal static class AvroParser { - public static List Parse(Stream stream, CancellationToken cancellationToken = default) => - ReadObjectContainerFileAsync(stream, async: false, cancellationToken).EnsureCompleted(); - - public static async Task> ParseAsync(Stream stream, CancellationToken cancellationToken = default) => - await ReadObjectContainerFileAsync(stream, async: true, cancellationToken).ConfigureAwait(false); - - private static async Task> ReadObjectContainerFileAsync( - Stream stream, - bool async, - CancellationToken cancellationToken = default) - { - // Four bytes, ASCII 'O', 'b', 'j', followed by 1. - byte[] header = await ReadFixedBytesAsync(stream, AvroConstants.InitBytes.Length, async, cancellationToken).ConfigureAwait(false); - Debug.Assert(header[0] == AvroConstants.InitBytes[0]); - Debug.Assert(header[1] == AvroConstants.InitBytes[1]); - Debug.Assert(header[2] == AvroConstants.InitBytes[2]); - Debug.Assert(header[3] == AvroConstants.InitBytes[3]); - - // File metadata is written as if defined by the following map schema: - // { "type": "map", "values": "bytes"} - Dictionary metadata = await ReadMapAsync(stream, ReadStringAsync, async, cancellationToken).ConfigureAwait(false); - Debug.Assert(metadata[AvroConstants.CodecKey] == "null"); - - // The 16-byte, randomly-generated sync marker for this file. - byte[] syncMarker = await ReadFixedBytesAsync(stream, AvroConstants.SyncMarkerSize, async, cancellationToken).ConfigureAwait(false); - - // Parse the schema - using JsonDocument schema = JsonDocument.Parse(metadata[AvroConstants.SchemaKey]); - AvroType itemType = AvroType.FromSchema(schema.RootElement); - - // File data blocks - var data = new List(); - while (stream.Position < stream.Length) - { - long length = await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false); - await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false); // Ignore the block size - while (length-- > 0) - { - object value = await itemType.ReadAsync(stream, async, cancellationToken).ConfigureAwait(false); - data.Add(value); - } - await ReadFixedBytesAsync(stream, AvroConstants.SyncMarkerSize, async, cancellationToken).ConfigureAwait(false); // Ignore the sync check - } - return data; - } - public static async Task ReadFixedBytesAsync( Stream stream, int length, From a3ad5a0c6b7482b814f45a8de5e42a7c05ce7300 Mon Sep 17 00:00:00 2001 From: Sean McCullough Date: Tue, 21 Apr 2020 15:17:22 -0700 Subject: [PATCH 3/4] Fixed cross-platform tests issue --- .../Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs b/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs index 702830addceae..944fee588e246 100644 --- a/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs +++ b/sdk/storage/Azure.Storage.Internal.Avro/tests/AvroReaderTests.cs @@ -48,7 +48,7 @@ public async Task Tests() foreach (TestCase testCase in testCases) { // Arrange - using FileStream stream = File.OpenRead($"Resources\\{testCase.Path}"); + using FileStream stream = File.OpenRead($"Resources{Path.DirectorySeparatorChar}{testCase.Path}"); AvroReader avroReader = new AvroReader(stream); // Act From 3ff0a68bfa965e2b30a5e9af82028d9e0cdceb9d Mon Sep 17 00:00:00 2001 From: Sean McCullough Date: Thu, 23 Apr 2020 13:39:28 -0700 Subject: [PATCH 4/4] PR comments --- .../src/AvroConstants.cs | 23 +- .../src/AvroParser.cs | 232 ++++++++++++------ .../src/AvroReader.cs | 4 +- 3 files changed, 182 insertions(+), 77 deletions(-) diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs index ca3d23a201ca3..551b39f9bbcd8 100644 --- a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroConstants.cs @@ -19,6 +19,27 @@ internal class AvroConstants }; public const string CodecKey = "avro.codec"; public const string SchemaKey = "avro.schema"; - public const string DeflateCodec = "deflate"; + + public const string Null = "null"; + public const string Boolean = "boolean"; + public const string Int = "int"; + public const string Long = "long"; + public const string Float = "float"; + public const string Double = "double"; + public const string Bytes = "bytes"; + public const string String = "string"; + public const string Record = "record"; + public const string Enum = "enum"; + public const string Map = "map"; + public const string Array = "array"; + public const string Union = "union"; + public const string Fixed = "fixed"; + + public const string Aliases = "aliases"; + public const string Name = "name"; + public const string Fields = "fields"; + public const string Type = "type"; + public const string Symbols = "symbols"; + public const string Values = "values"; } } diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs index 607dabfba15c5..c6b1212de1bb1 100644 --- a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroParser.cs @@ -3,13 +3,11 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.IO; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Azure.Core.Pipeline; #pragma warning disable SA1402 // File may only contain a single type @@ -17,6 +15,15 @@ namespace Azure.Storage.Internal.Avro { internal static class AvroParser { + /// + /// Reads a fixed number of bytes from the stream. + /// The number of bytes to return is the first int read from the stream. + /// + /// + /// Note that in the Avro spec, byte array length is specified as a long. + /// This is fine for Quick Query and Change Feed, but could become a problem + /// in the future. + /// public static async Task ReadFixedBytesAsync( Stream stream, int length, @@ -40,6 +47,9 @@ await stream.ReadAsync(data, start, length, cancellationToken).ConfigureAwait(fa return data; } + /// + /// Reads a single byte from the stream. + /// private static async Task ReadByteAsync( Stream stream, bool async, @@ -49,7 +59,9 @@ private static async Task ReadByteAsync( return bytes[0]; } - // Stolen because the linked references in the Avro spec were subpar... + /// + /// Internal implementation of ReadLongAsync(). + /// private static async Task ReadZigZagLongAsync( Stream stream, bool async, @@ -68,29 +80,50 @@ private static async Task ReadZigZagLongAsync( return (-(value & 0x01L)) ^ ((value >> 1) & 0x7fffffffffffffffL); } + /// + /// Returns null. + /// public static Task ReadNullAsync() => Task.FromResult(null); + /// + /// Reads a bool from the stream. + /// public static async Task ReadBoolAsync( Stream stream, bool async, CancellationToken cancellationToken) { byte b = await ReadByteAsync(stream, async, cancellationToken).ConfigureAwait(false); - return b != 0; + + if (b == 1) + return true; + else if (b == 0) + return false; + else + throw new InvalidOperationException("Byte was not a bool."); } + /// + /// Reads a long from the stream. + /// public static async Task ReadLongAsync( Stream stream, bool async, CancellationToken cancellationToken) => await ReadZigZagLongAsync(stream, async, cancellationToken).ConfigureAwait(false); + /// + /// Reads an int from the stream. + /// public static async Task ReadIntAsync( Stream stream, bool async, CancellationToken cancellationToken) => (int)(await ReadLongAsync(stream, async, cancellationToken).ConfigureAwait(false)); + /// + /// Reads a float from the stream. + /// public static async Task ReadFloatAsync( Stream stream, bool async, @@ -100,6 +133,9 @@ public static async Task ReadFloatAsync( return BitConverter.ToSingle(bytes, 0); } + /// + /// Reads a double from the stream. + /// public static async Task ReadDoubleAsync( Stream stream, bool async, @@ -109,15 +145,23 @@ public static async Task ReadDoubleAsync( return BitConverter.ToDouble(bytes, 0); } + /// + /// Reads a fixed number of bytes from the stream. + /// public static async Task ReadBytesAsync( Stream stream, bool async, CancellationToken cancellationToken) { + // Note that byte array length is actually defined as a long in the Avro spec. + // This is fine for now, but may need to be changed in the future. int size = await ReadIntAsync(stream, async, cancellationToken).ConfigureAwait(false); return await ReadFixedBytesAsync(stream, size, async, cancellationToken).ConfigureAwait(false); } + /// + /// Reads a string from the stream. + /// public static async Task ReadStringAsync( Stream stream, bool async, @@ -127,6 +171,10 @@ public static async Task ReadStringAsync( return Encoding.UTF8.GetString(bytes); } + /// + /// Reads a KeyValuePair from the stream. + /// Used in ReadMapAsync(). + /// private static async Task> ReadMapPairAsync( Stream stream, Func> parseItemAsync, @@ -140,6 +188,9 @@ private static async Task> ReadMapPairAsync( return new KeyValuePair(key, value); } + /// + /// Reads a map from the stream. + /// public static async Task> ReadMapAsync( Stream stream, Func> parseItemAsync, @@ -157,6 +208,9 @@ public static async Task> ReadMapAsync( return entries.ToDictionary(); } + /// + /// Reads an array of objects from the stream. + /// private static async Task> ReadArrayAsync( Stream stream, Func> parseItemAsync, @@ -186,6 +240,9 @@ private static async Task> ReadArrayAsync( return items; } + /// + /// Adds the select to each element in the array. + /// internal static List Map(this JsonElement array, Func selector) { var values = new List(); @@ -196,6 +253,9 @@ internal static List Map(this JsonElement array, Func sele return values; } + /// + /// Converts an IEnumerable of KeyValuePair into a Dictionary. + /// internal static Dictionary ToDictionary(this IEnumerable> values) { Dictionary dict = new Dictionary(); @@ -207,98 +267,110 @@ internal static Dictionary ToDictionary(this IEnumerable + /// Parent class of AvroTypes. + /// internal abstract class AvroType { + /// + /// Reads an object from the stream. + /// public abstract Task ReadAsync( Stream stream, bool async, CancellationToken cancellationToken); + /// + /// Determinds the AvroType from the Avro Schema. + /// public static AvroType FromSchema(JsonElement schema) { - switch (schema.ValueKind) + return schema.ValueKind switch { // Primitives - case JsonValueKind.String: - { - string type = schema.GetString(); - switch (type) - { - case "null": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Null }; - case "boolean": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean }; - case "int": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Int }; - case "long": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Long }; - case "float": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Float }; - case "double": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Double }; - case "bytes": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes }; - case "string": - return new AvroPrimitiveType { Primitive = AvroPrimitive.String }; - default: - throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"); - } - } + JsonValueKind.String => FromStringSchema(schema), // Union types - case JsonValueKind.Array: - return new AvroUnionType { Types = schema.Map(FromSchema) }; + JsonValueKind.Array => FromArraySchema(schema), // Everything else - case JsonValueKind.Object: + JsonValueKind.Object => FromObjectSchema(schema), + _ => throw new InvalidOperationException($"Unexpected JSON Element: {schema}"), + }; + } + + private static AvroType FromStringSchema(JsonElement schema) + { + string type = schema.GetString(); + return type switch + { + AvroConstants.Null => new AvroPrimitiveType { Primitive = AvroPrimitive.Null }, + AvroConstants.Boolean => new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean }, + AvroConstants.Int => new AvroPrimitiveType { Primitive = AvroPrimitive.Int }, + AvroConstants.Long => new AvroPrimitiveType { Primitive = AvroPrimitive.Long }, + AvroConstants.Float => new AvroPrimitiveType { Primitive = AvroPrimitive.Float }, + AvroConstants.Double => new AvroPrimitiveType { Primitive = AvroPrimitive.Double }, + AvroConstants.Bytes => new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes }, + AvroConstants.String => new AvroPrimitiveType { Primitive = AvroPrimitive.String }, + _ => throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"), + }; + } + + private static AvroType FromArraySchema(JsonElement schema) + { + return new AvroUnionType { Types = schema.Map(FromSchema) }; + } + + private static AvroType FromObjectSchema(JsonElement schema) + { + string type = schema.GetProperty("type").GetString(); + switch (type) + { + // Primitives can be defined as strings or objects + case AvroConstants.Null: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Null }; + case AvroConstants.Boolean: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean }; + case AvroConstants.Int: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Int }; + case AvroConstants.Long: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Long }; + case AvroConstants.Float: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Float }; + case AvroConstants.Double: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Double }; + case AvroConstants.Bytes: + return new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes }; + case AvroConstants.String: + return new AvroPrimitiveType { Primitive = AvroPrimitive.String }; + case AvroConstants.Record: + if (schema.TryGetProperty(AvroConstants.Aliases, out var _)) + throw new InvalidOperationException($"Unexpected aliases on {schema}"); + string name = schema.GetProperty(AvroConstants.Name).GetString(); + Dictionary fields = new Dictionary(); + foreach (JsonElement field in schema.GetProperty(AvroConstants.Fields).EnumerateArray()) { - string type = schema.GetProperty("type").GetString(); - switch (type) - { - // Primitives can be defined as strings or objects - case "null": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Null }; - case "boolean": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Boolean }; - case "int": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Int }; - case "long": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Long }; - case "float": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Float }; - case "double": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Double }; - case "bytes": - return new AvroPrimitiveType { Primitive = AvroPrimitive.Bytes }; - case "string": - return new AvroPrimitiveType { Primitive = AvroPrimitive.String }; - case "record": - if (schema.TryGetProperty("aliases", out var _)) throw new InvalidOperationException($"Unexpected aliases on {schema}"); - string name = schema.GetProperty("name").GetString(); - Dictionary fields = new Dictionary(); - foreach (JsonElement field in schema.GetProperty("fields").EnumerateArray()) - { - fields[field.GetProperty("name").GetString()] = FromSchema(field.GetProperty("type")); - } - return new AvroRecordType { Schema = name, Fields = fields }; - case "enum": - if (schema.TryGetProperty("aliases", out var _)) throw new InvalidOperationException($"Unexpected aliases on {schema}"); - return new AvroEnumType { Symbols = schema.GetProperty("symbols").Map(s => s.GetString()) }; - case "map": - return new AvroMapType { ItemType = FromSchema(schema.GetProperty("values")) }; - case "array": // Unused today - case "union": // Unused today - case "fixed": // Unused today - default: - throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"); - } + fields[field.GetProperty(AvroConstants.Name).GetString()] = FromSchema(field.GetProperty(AvroConstants.Type)); } + return new AvroRecordType { Schema = name, Fields = fields }; + case AvroConstants.Enum: + if (schema.TryGetProperty(AvroConstants.Aliases, out var _)) + throw new InvalidOperationException($"Unexpected aliases on {schema}"); + return new AvroEnumType { Symbols = schema.GetProperty(AvroConstants.Symbols).Map(s => s.GetString()) }; + case AvroConstants.Map: + return new AvroMapType { ItemType = FromSchema(schema.GetProperty(AvroConstants.Values)) }; + case AvroConstants.Array: // Unused today + case AvroConstants.Union: // Unused today + case AvroConstants.Fixed: // Unused today default: - throw new InvalidOperationException($"Unexpected JSON Element: {schema}"); + throw new InvalidOperationException($"Unexpected Avro type {type} in {schema}"); } } } internal enum AvroPrimitive { Null, Boolean, Int, Long, Float, Double, Bytes, String }; + /// + /// AvroPrimativeType. + /// internal class AvroPrimitiveType : AvroType { public AvroPrimitive Primitive { get; set; } @@ -323,6 +395,9 @@ public override async Task ReadAsync( }; } + /// + /// AvroEnumType. + /// internal class AvroEnumType : AvroType { public IReadOnlyList Symbols { get; set; } @@ -337,6 +412,9 @@ public override async Task ReadAsync( } } + /// + /// AvroUnionType. + /// internal class AvroUnionType : AvroType { public IReadOnlyList Types { get; set; } @@ -351,6 +429,9 @@ public override async Task ReadAsync( } } + /// + /// AvroMapType. + /// internal class AvroMapType : AvroType { public AvroType ItemType { get; set; } @@ -366,6 +447,9 @@ public override async Task ReadAsync( } } + /// + /// AvroRecordType. + /// internal class AvroRecordType : AvroType { public string Schema { get; set; } diff --git a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs index a3f89bbd0c842..089cb0c264276 100644 --- a/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs +++ b/sdk/storage/Azure.Storage.Internal.Avro/src/AvroReader.cs @@ -106,9 +106,9 @@ private async Task Initalize(bool async, CancellationToken cancellationToken = d // Validate codec _metadata.TryGetValue(AvroConstants.CodecKey, out string codec); - if (codec == AvroConstants.DeflateCodec) + if (!(codec == null || codec == "null")) { - throw new ArgumentException("Deflate codec is not supported"); + throw new ArgumentException("Codecs are not supported"); } // The 16-byte, randomly-generated sync marker for this file.