diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs
index 60699fc8989d..79bf05172809 100644
--- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/LruCache.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
+using System.Collections;
using System.Collections.Generic;
namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
@@ -8,63 +9,76 @@ namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
///
/// A simple LRU cache implementation using a doubly linked list and dictionary.
///
- /// The type of key
- /// The type of value
- internal class LruCache
+ /// The type of key
+ /// The type of value
+ internal class LruCache : IEnumerable>
{
private readonly int _capacity;
- private readonly LinkedList> _linkedList;
- private readonly Dictionary>> _map;
+ private readonly LinkedList> _linkedList;
+ private readonly Dictionary> Node, int Length)> _map;
private readonly object _syncLock;
+ internal int Count => _linkedList.Count;
+
+ internal int TotalLength { get; private set; }
+
public LruCache(int capacity)
{
_capacity = capacity;
- _linkedList = new LinkedList>();
- _map = new Dictionary>>();
+ _linkedList = new LinkedList>();
+ _map = new Dictionary>, int)>();
_syncLock = new object();
}
- public bool TryGet(K key, out V value)
+ public bool TryGet(TKey key, out TValue value)
{
lock (_syncLock)
{
- if (_map.TryGetValue(key, out var node))
+ if (_map.TryGetValue(key, out var mapValue))
{
+ var node = mapValue.Node;
value = node.Value.Value;
_linkedList.Remove(node);
_linkedList.AddFirst(node);
return true;
}
- value = default(V);
+ value = default(TValue);
return false;
}
}
- public void AddOrUpdate(K key, V val)
+ public void AddOrUpdate(TKey key, TValue val, int length)
{
lock (_syncLock)
{
- if (_map.TryGetValue(key, out var existingNode))
+ if (_map.TryGetValue(key, out var existingValue))
{
// remove node - we will re-add a new node for this key at the head of the list, as the value may be different
- _linkedList.Remove(existingNode);
+ _linkedList.Remove(existingValue.Node);
+ TotalLength -= _map[key].Length;
}
// add new node
- var node = new LinkedListNode>(new KeyValuePair(key, val));
+ var node = new LinkedListNode>(new KeyValuePair(key, val));
_linkedList.AddFirst(node);
- _map[key] = node;
+ _map[key] = (node, length);
+ TotalLength += length;
if (_map.Count > _capacity)
{
// remove least recently used node
- LinkedListNode> last = _linkedList.Last;
+ LinkedListNode> last = _linkedList.Last;
_linkedList.RemoveLast();
+ var toRemove = _map[last.Value.Key];
_map.Remove(last.Value.Key);
+ TotalLength -= toRemove.Length;
}
}
}
+
+ public IEnumerator> GetEnumerator() => _linkedList.GetEnumerator();
+
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
\ No newline at end of file
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj
index 7aa39faee6fd..770eead94912 100644
--- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro.csproj
@@ -16,6 +16,7 @@
+
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs
new file mode 100644
index 000000000000..4138f3254e86
--- /dev/null
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroEventSource.cs
@@ -0,0 +1,60 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System.Diagnostics.Tracing;
+using Avro;
+using Azure.Core.Diagnostics;
+
+namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
+{
+ ///
+ /// Serves as an ETW event source for logging of information about
+ /// Entity's client.
+ ///
+ ///
+ ///
+ /// When defining Start/Complete tasks, it is highly recommended that the
+ /// the CompleteEvent.Id must be exactly StartEvent.Id + 1.
+ ///
+ [EventSource(Name = EventSourceName)]
+ internal class SchemaRegistryAvroEventSource : AzureEventSource
+ {
+ /// The name to use for the event source.
+ private const string EventSourceName = "Microsoft-Azure-Data-SchemaRegistry-ApacheAvro";
+
+ internal const int CacheUpdatedEvent = 1;
+
+ ///
+ /// Provides a singleton instance of the event source for callers to
+ /// use for logging.
+ ///
+ public static SchemaRegistryAvroEventSource Log { get; } = new SchemaRegistryAvroEventSource();
+
+ ///
+ /// Prevents an instance of the class from being
+ /// created outside the scope of the instance, as well as setting up the
+ /// integration with AzureEventSourceListener.
+ ///
+ protected SchemaRegistryAvroEventSource() : base(EventSourceName)
+ {
+ }
+
+ [NonEvent]
+ public virtual void CacheUpdated(LruCache idToSchemaCache, LruCache schemaToIdCache)
+ {
+ if (IsEnabled())
+ {
+ CacheUpdatedCore(idToSchemaCache.Count + schemaToIdCache.Count, idToSchemaCache.TotalLength + schemaToIdCache.TotalLength);
+ }
+ }
+
+ [Event(CacheUpdatedEvent, Level = EventLevel.Verbose, Message = "Cache entry added or updated. Total number of entries: {0}; Total schema length: {1}")]
+ public virtual void CacheUpdatedCore(int entryCount, int totalSchemaLength)
+ {
+ if (IsEnabled())
+ {
+ WriteEvent(CacheUpdatedEvent, entryCount, totalSchemaLength);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs
index 82f6ce631ea5..5970826dd525 100644
--- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs
+++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroSerializer.cs
@@ -20,7 +20,7 @@ namespace Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
{
///
/// A uses the to
- /// encode and decode Avro payloads.
+ /// serialize and deserialize Avro payloads.
///
public class SchemaRegistryAvroSerializer
{
@@ -57,41 +57,41 @@ private enum SupportedType
#region Serialize
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// Serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
+ /// The data to serialize to Avro and serialize into the message.
/// An optional instance to signal the request to cancel the operation.
- /// The type to encode the data into.
- /// The type of the data to encode.
+ /// The type to serialize the data into.
+ /// The type of the data to serialize.
public TEnvelope Serialize(
TData data,
CancellationToken cancellationToken = default) where TEnvelope : BinaryContent, new()
=> (TEnvelope) SerializeInternalAsync(data, typeof(TData), typeof(TEnvelope), false, cancellationToken).EnsureCompleted();
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
+ /// The data to serialize to Avro and serialize into the message.
/// An optional instance to signal the request to cancel the operation.
- /// The type to encode the data into.
- /// The type of the data to encode.
+ /// The type to serialize the data into.
+ /// The type of the data to serialize.
public async ValueTask SerializeAsync(
TData data,
CancellationToken cancellationToken = default) where TEnvelope : BinaryContent, new()
=> (TEnvelope) await SerializeInternalAsync(data, typeof(TData), typeof(TEnvelope), true, cancellationToken).ConfigureAwait(false);
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
- /// The type of the data to encode. If left blank, the type will be determined at runtime by
+ /// The data to serialize to Avro and serialize into the message.
+ /// The type of the data to serialize. If left blank, the type will be determined at runtime by
/// calling .
- /// The type of message to encode the data into. Must extend from , and
+ /// The type of message to serialize the data into. Must extend from , and
/// have a parameterless constructor.
- /// If left blank, the data will be encoded into a instance.
+ /// If left blank, the data will be serialized into a instance.
/// An optional instance to signal the request to cancel the operation.
public BinaryContent Serialize(
object data,
@@ -101,15 +101,15 @@ public BinaryContent Serialize(
=> SerializeInternalAsync(data, dataType, messageType, false, cancellationToken).EnsureCompleted();
///
- /// Encodes the message data as Avro and stores it in . The
- /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to encode the data.
+ /// serializes the message data as Avro and stores it in . The
+ /// will be set to "avro/binary+schemaId" where schemaId is the ID of the schema used to serialize the data.
///
- /// The data to serialize to Avro and encode into the message.
- /// The type of the data to encode. If left blank, the type will be determined at runtime by
+ /// The data to serialize to Avro and serialize into the message.
+ /// The type of the data to serialize. If left blank, the type will be determined at runtime by
/// calling .
- /// The type of message to encode the data into. Must extend from , and
+ /// The type of message to serialize the data into. Must extend from , and
/// have a parameterless constructor.
- /// If left blank, the data will be encoded into a instance.
+ /// If left blank, the data will be serialized into a instance.
/// An optional instance to signal the request to cancel the operation.
public async ValueTask SerializeAsync(
object data,
@@ -168,33 +168,35 @@ internal async ValueTask SerializeInternalAsync(
private async Task GetSchemaIdAsync(Schema schema, bool async, CancellationToken cancellationToken)
{
- if (_schemaToIdMap.TryGet(schema, out string schemaId))
+ if (_schemaToIdMap.TryGet(schema, out var value))
{
- return schemaId;
+ return value;
}
SchemaProperties schemaProperties;
+ string schemaString = schema.ToString();
if (async)
{
schemaProperties = _options.AutoRegisterSchemas
? (await _client
- .RegisterSchemaAsync(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken)
+ .RegisterSchemaAsync(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken)
.ConfigureAwait(false)).Value
: await _client
- .GetSchemaPropertiesAsync(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken)
+ .GetSchemaPropertiesAsync(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken)
.ConfigureAwait(false);
}
else
{
schemaProperties = _options.AutoRegisterSchemas
- ? _client.RegisterSchema(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken)
- : _client.GetSchemaProperties(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken);
+ ? _client.RegisterSchema(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken)
+ : _client.GetSchemaProperties(_groupName, schema.Fullname, schemaString, SchemaFormat.Avro, cancellationToken);
}
string id = schemaProperties.Id;
- _schemaToIdMap.AddOrUpdate(schema, id);
- _idToSchemaMap.AddOrUpdate(id, schema);
+ _schemaToIdMap.AddOrUpdate(schema, id, schemaString.Length);
+ _idToSchemaMap.AddOrUpdate(id, schema, schemaString.Length);
+ SchemaRegistryAvroEventSource.Log.CacheUpdated(_idToSchemaMap, _schemaToIdMap);
return id;
}
@@ -229,66 +231,66 @@ private static SupportedType GetSupportedTypeOrThrow(Type type)
}
#endregion
- #region Decode
+ #region Deserialize
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// Deserializes the message data into the specified type using the schema information populated in .
///
- /// The message containing the data to decode.
+ /// The message containing the data to deserialize.
/// An optional instance to signal the request to cancel the operation.
- /// The type to decode the message data into.
+ /// The type to deserialize the message data into.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public TData Deserialize(
BinaryContent content,
CancellationToken cancellationToken = default)
- => (TData) DecodeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, false, cancellationToken).EnsureCompleted();
+ => (TData) DeserializeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, false, cancellationToken).EnsureCompleted();
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// deserializes the message data into the specified type using the schema information populated in .
///
/// The content to deserialize.
/// An optional instance to signal the request to cancel the operation.
- /// The type to decode the message data into.
+ /// The type to deserialize the message data into.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public async ValueTask DeserializeAsync(
BinaryContent content,
CancellationToken cancellationToken = default)
- => (TData) await DecodeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, true, cancellationToken).ConfigureAwait(false);
+ => (TData) await DeserializeMessageDataInternalAsync(content.Data, typeof(TData), content.ContentType, true, cancellationToken).ConfigureAwait(false);
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// Deserializes the message data into the specified type using the schema information populated in .
///
- /// The message containing the data to decode.
- /// The type to decode the message data into.
+ /// The message containing the data to deserialize.
+ /// The type to deserialize the message data into.
/// An optional instance to signal the request to cancel the operation.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public object Deserialize(
BinaryContent content,
Type dataType,
CancellationToken cancellationToken = default)
- => DecodeMessageDataInternalAsync(content.Data, dataType, content.ContentType, false, cancellationToken).EnsureCompleted();
+ => DeserializeMessageDataInternalAsync(content.Data, dataType, content.ContentType, false, cancellationToken).EnsureCompleted();
///
- /// Decodes the message data into the specified type using the schema information populated in .
+ /// Deserializes the message data into the specified type using the schema information populated in .
///
- /// The message containing the data to decode.
- /// The type to decode the message data into.
+ /// The message containing the data to deserialize.
+ /// The type to deserialize the message data into.
/// An optional instance to signal the request to cancel the operation.
/// The deserialized data.
/// Thrown if the content type is not in the expected format.
- /// Thrown if an attempt is made to decode non-Avro data.
+ /// Thrown if an attempt is made to deserialize non-Avro data.
public async ValueTask