From 9c6f77308367449cb058e18dc826659d315d03f1 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 1 Mar 2022 17:44:01 -0800
Subject: [PATCH 1/4] Add logging of cache size
---
.../src/LruCache.cs | 43 ++++--
...zure.Data.SchemaRegistry.ApacheAvro.csproj | 1 +
.../src/SchemaRegistryAvroEventSource.cs | 64 +++++++++
.../src/SchemaRegistryAvroSerializer.cs | 127 +++++++++---------
.../tests/EventSourceLiveTests.cs | 103 ++++++++++++++
...egistryAvroObjectSerializerLiveTestBase.cs | 23 ++++
...maRegistryAvroObjectSerializerLiveTests.cs | 12 +-
.../UpdatingCacheLogsEvents.json | 105 +++++++++++++++
.../UpdatingCacheLogsEventsAsync.json | 105 +++++++++++++++
9 files changed, 499 insertions(+), 84 deletions(-)
create mode 100644 sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs
create mode 100644 sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/EventSourceLiveTests.cs
create mode 100644 sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/SchemaRegistryAvroObjectSerializerLiveTestBase.cs
create mode 100644 sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/SessionRecords/EventSourceLiveTests/UpdatingCacheLogsEvents.json
create mode 100644 sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/SessionRecords/EventSourceLiveTests/UpdatingCacheLogsEventsAsync.json
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs
index 60699fc8989d8..29c6fdd161821 100644
--- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
+using System.Collections;
using System.Collections.Generic;
namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
@@ -8,24 +9,26 @@ namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
///
/// A simple LRU cache implementation using a doubly linked list and dictionary.
///
- /// The type of key
- /// The type of value
- internal class LruCache
+ /// The type of key
+ /// The type of value
+ internal class LruCache : IEnumerable>
{
private readonly int _capacity;
- private readonly LinkedList> _linkedList;
- private readonly Dictionary>> _map;
+ internal readonly LinkedList> _linkedList;
+ private readonly Dictionary>> _map;
private readonly object _syncLock;
+ internal int Count => _linkedList.Count;
+
public LruCache(int capacity)
{
_capacity = capacity;
- _linkedList = new LinkedList>();
- _map = new Dictionary>>();
+ _linkedList = new LinkedList>();
+ _map = new Dictionary>>();
_syncLock = new object();
}
- public bool TryGet(K key, out V value)
+ public bool TryGet(TKey key, out TValue value)
{
lock (_syncLock)
{
@@ -37,12 +40,12 @@ public bool TryGet(K key, out V value)
return true;
}
- value = default(V);
+ value = default(TValue);
return false;
}
}
- public void AddOrUpdate(K key, V val)
+ public void AddOrUpdate(TKey key, TValue val)
{
lock (_syncLock)
{
@@ -53,18 +56,34 @@ public void AddOrUpdate(K key, V val)
}
// add new node
- var node = new LinkedListNode>(new KeyValuePair(key, val));
+ var node = new LinkedListNode>(new KeyValuePair(key, val));
_linkedList.AddFirst(node);
_map[key] = node;
if (_map.Count > _capacity)
{
// remove least recently used node
- LinkedListNode> last = _linkedList.Last;
+ LinkedListNode> last = _linkedList.Last;
_linkedList.RemoveLast();
_map.Remove(last.Value.Key);
}
}
}
+
+ internal int ComputeSize()
+ {
+ int size = 0;
+ foreach (KeyValuePair kvp in _linkedList)
+ {
+ size += kvp.Key.ToString().Length * sizeof(char);
+ size += kvp.Value.ToString().Length * sizeof(char);
+ }
+
+ return size;
+ }
+
+ public IEnumerator> GetEnumerator() => _linkedList.GetEnumerator();
+
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
\ No newline at end of file
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj
index 7aa39faee6fdc..6d1b41dae5907 100644
--- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj
@@ -16,6 +16,7 @@
+
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs
new file mode 100644
index 0000000000000..f265af48315c5
--- /dev/null
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs
@@ -0,0 +1,64 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System.Collections.Generic;
+using System.Diagnostics.Tracing;
+using System.Linq;
+using Avro;
+using Azure.Core.Diagnostics;
+
+namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
+{
+ ///
+ /// Serves as an ETW event source for logging of information about
+ /// Entity's client.
+ ///
+ ///
+ ///
+ /// When defining Start/Complete tasks, it is highly recommended that the
+ /// the CompleteEvent.Id must be exactly StartEvent.Id + 1.
+ ///
+ [EventSource(Name = EventSourceName)]
+ internal class SchemaRegistryAvroEventSource : AzureEventSource
+ {
+ /// The name to use for the event source.
+ private const string EventSourceName = "Microsoft-Azure-Data-SchemaRegistry-ApacheAvro";
+
+ internal const int CacheUpdatedEvent = 1;
+
+ ///
+ /// Provides a singleton instance of the event source for callers to
+ /// use for logging.
+ ///
+ public static SchemaRegistryAvroEventSource Log { get; } = new SchemaRegistryAvroEventSource();
+
+ ///
+ /// Prevents an instance of the class from being
+ /// created outside the scope of the instance, as well as setting up the
+ /// integration with AzureEventSourceListener.
+ ///
+ protected SchemaRegistryAvroEventSource() : base(EventSourceName)
+ {
+ }
+
+ [NonEvent]
+ public virtual void CacheUpdated(LruCache idToSchemaCache, LruCache schemaToIdCache)
+ {
+ if (IsEnabled())
+ {
+ int totalSchemaLength = idToSchemaCache.Sum(kvp => kvp.Value.SchemaLength);
+ totalSchemaLength += schemaToIdCache.Sum(kvp => kvp.Value.SchemaLength);
+ CacheUpdatedCore(idToSchemaCache.Count + schemaToIdCache.Count, totalSchemaLength);
+ }
+ }
+
+ [Event(CacheUpdatedEvent, Level = EventLevel.Verbose, Message = "Cache entry added or updated. Total number of entries: {0}; Total schema length: {1}")]
+ public virtual void CacheUpdatedCore(int entryCount, int totalSchemaLength)
+ {
+ if (IsEnabled())
+ {
+ WriteEvent(CacheUpdatedEvent, entryCount, totalSchemaLength);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs
index 82f6ce631ea55..50dd3692e4f40 100644
--- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs
@@ -20,7 +20,7 @@ namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
{
///
/// A uses the to
- /// encode and decode Avro payloads.
+ /// serialize and deserialize Avro payloads.
///
public class SchemaRegistryAvroSerializer
{
@@ -45,8 +45,8 @@ public SchemaRegistryAvroSerializer(SchemaRegistryClient client, string groupNam
private const int RecordFormatIndicatorLength = 4;
private const int SchemaIdLength = 32;
private const int PayloadStartPosition = RecordFormatIndicatorLength + SchemaIdLength;
- private readonly LruCache _idToSchemaMap = new(CacheCapacity);
- private readonly LruCache _schemaToIdMap = new(CacheCapacity);
+ private readonly LruCache _idToSchemaMap = new(CacheCapacity);
+ private readonly LruCache _schemaToIdMap = new(CacheCapacity);
private enum SupportedType
{
@@ -57,41 +57,41 @@ private enum SupportedType
#region Serialize
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// Serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
+ /// The data to serialize to Avro and serialize into the message.
/// An optional instance to signal the request to cancel the operation.
- /// The type to encode the data into.
- /// The type of the data to encode.
+ /// The type to serialize the data into.
+ /// The type of the data to serialize.
public TEnvelope Serialize(
TData data,
CancellationToken cancellationToken = default) where TEnvelope : BinaryContent, new()
=> (TEnvelope) SerializeInternalAsync(data, typeof(TData), typeof(TEnvelope), false, cancellationToken).EnsureCompleted();
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
+ /// The data to serialize to Avro and serialize into the message.
/// An optional instance to signal the request to cancel the operation.
- /// The type to encode the data into.
- /// The type of the data to encode.
+ /// The type to serialize the data into.
+ /// The type of the data to serialize.
public async ValueTask SerializeAsync(
TData data,
CancellationToken cancellationToken = default) where TEnvelope : BinaryContent, new()
=> (TEnvelope) await SerializeInternalAsync(data, typeof(TData), typeof(TEnvelope), true, cancellationToken).ConfigureAwait(false);
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
- /// The type of the data to encode. If left blank, the type will be determined at runtime by
+ /// The data to serialize to Avro and serialize into the message.
+ /// The type of the data to serialize. If left blank, the type will be determined at runtime by
/// calling .
- /// The type of message to encode the data into. Must extend from , and
+ /// The type of message to serialize the data into. Must extend from , and
/// have a parameterless constructor.
- /// If left blank, the data will be encoded into a instance.
+ /// If left blank, the data will be serialized into a instance.
/// An optional instance to signal the request to cancel the operation.
public BinaryContent Serialize(
object data,
@@ -101,15 +101,15 @@ public BinaryContent Serialize(
=> SerializeInternalAsync(data, dataType, messageType, false, cancellationToken).EnsureCompleted();
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
- /// The type of the data to encode. If left blank, the type will be determined at runtime by
+ /// The data to serialize to Avro and serialize into the message.
+ /// The type of the data to serialize. If left blank, the type will be determined at runtime by
/// calling .
- /// The type of message to encode the data into. Must extend from , and
+ /// The type of message to serialize the data into. Must extend from , and
/// have a parameterless constructor.
- /// If left blank, the data will be encoded into a instance.
+ /// If left blank, the data will be serialized into a instance.
/// An optional instance to signal the request to cancel the operation.
public async ValueTask SerializeAsync(
object data,
@@ -168,33 +168,35 @@ internal async ValueTask SerializeInternalAsync(
private async Task GetSchemaIdAsync(Schema schema, bool async, CancellationToken cancellationToken)
{
- if (_schemaToIdMap.TryGet(schema, out string schemaId))
+ if (_schemaToIdMap.TryGet(schema, out var value))
{
- return schemaId;
+ return value.SchemaId;
}
SchemaProperties schemaProperties;
+ string schemaString = schema.ToString();
if (async)
{
schemaProperties = _options.AutoRegisterSchemas
? (await _client
- .RegisterSchemaAsync(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken)
+ .RegisterSchemaAsync(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken)
.ConfigureAwait(false)).Value
: await _client
- .GetSchemaPropertiesAsync(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken)
+ .GetSchemaPropertiesAsync(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken)
.ConfigureAwait(false);
}
else
{
schemaProperties = _options.AutoRegisterSchemas
- ? _client.RegisterSchema(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken)
- : _client.GetSchemaProperties(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken);
+ ? _client.RegisterSchema(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken)
+ : _client.GetSchemaProperties(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken);
}
string id = schemaProperties.Id;
- _schemaToIdMap.AddOrUpdate(schema, id);
- _idToSchemaMap.AddOrUpdate(id, schema);
+ _schemaToIdMap.AddOrUpdate(schema, (id, schemaString.Length));
+ _idToSchemaMap.AddOrUpdate(id, (schema, schemaString.Length));
+ SchemaRegistryAvroEventSource.Log.CacheUpdated(_idToSchemaMap, _schemaToIdMap);
return id;
}
@@ -229,66 +231,66 @@ private static SupportedType GetSupportedTypeOrThrow(Type type)
}
#endregion
- #region Decode
+ #region Deserialize
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// Deserializes the message data into the specified type using the schema information populated in .
///
- /// The message containing the data to decode.
+ /// The message containing the data to deserialize.
/// An optional instance to signal the request to cancel the operation.
- /// The type to decode the message data into.
+ /// The type to deserialize the message data into.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public TData Deserialize(
BinaryContent content,
CancellationToken cancellationToken = default)
- => (TData) DecodeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, false, cancellationToken).EnsureCompleted();
+ => (TData) DeserializeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, false, cancellationToken).EnsureCompleted();
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// deserializes the message data into the specified type using the schema information populated in .
///
/// The content to deserialize.
/// An optional instance to signal the request to cancel the operation.
- /// The type to decode the message data into.
+ /// The type to deserialize the message data into.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public async ValueTask DeserializeAsync(
BinaryContent content,
CancellationToken cancellationToken = default)
- => (TData) await DecodeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, true, cancellationToken).ConfigureAwait(false);
+ => (TData) await DeserializeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, true, cancellationToken).ConfigureAwait(false);
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// Deserializes the message data into the specified type using the schema information populated in .
///
- /// The message containing the data to decode.
- /// The type to decode the message data into.
+ /// The message containing the data to deserialize.
+ /// The type to deserialize the message data into.
/// An optional instance to signal the request to cancel the operation.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public object Deserialize(
BinaryContent content,
Type dataType,
CancellationToken cancellationToken = default)
- => DecodeMessageDataInternalAsync(content.Data, dataType, content.ContentType, false, cancellationToken).EnsureCompleted();
+ => DeserializeMessageDataInternalAsync(content.Data, dataType, content.ContentType, false, cancellationToken).EnsureCompleted();
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// Deserializes the message data into the specified type using the schema information populated in .
///
- /// The message containing the data to decode.
- /// The type to decode the message data into.
+ /// The message containing the data to deserialize.
+ /// The type to deserialize the message data into.
/// An optional instance to signal the request to cancel the operation.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public async ValueTask