From ecbeabc6e6292ee47b5cabf0ab9c82a99d771cf3 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Mon, 10 Jun 2024 17:10:07 -0400 Subject: [PATCH 1/3] [Event Hubs] Add checkpoint diagnostic scope The focus of these changes is to add an diagnostic scope for the update checkpoint activity to the `PluggableCheckpointStoreEventProcessor` base class. --- .../src/Primitives/EventProcessor.cs | 14 +++ .../PluggableCheckpointStoreEventProcessor.cs | 43 +++++++- .../DiagnosticsActivitySourceTests.cs | 103 ++++++++++++++++++ 3 files changed, 156 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs index 2a5e5edd17e64..7e5f5228e4b69 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs @@ -588,6 +588,20 @@ public virtual void StopProcessing(CancellationToken cancellationToken = default [EditorBrowsable(EditorBrowsableState.Never)] public override string ToString() => $"Event Processor<{ typeof(TPartition).Name }>: { Identifier }"; + /// + /// Creates a diagnostic scope associated with a checkpointing activity. + /// + /// + /// The diagnostic scope. The caller is assumed to own the scope once returned and is responsible for disposing it. + /// + internal virtual DiagnosticScope StartUpdateCheckpointDiagnosticScope() + { + var scope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorCheckpointActivityName, ActivityKind.Internal); + scope.Start(); + + return scope; + } + /// /// Creates an to use for processing. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs index b3110143dcdc2..a99f56b4d82cb 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs @@ -4,10 +4,15 @@ using System; using System.Collections.Generic; using System.ComponentModel; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Azure.Core; +using Azure.Core.Shared; +using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Diagnostics; using Azure.Messaging.EventHubs.Processor; +using Microsoft.Azure.Amqp; namespace Azure.Messaging.EventHubs.Primitives { @@ -227,8 +232,23 @@ protected override Task GetCheckpointAsync(string part protected override Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, - CancellationToken cancellationToken) => - _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken); + CancellationToken cancellationToken) + { + using var scope = StartUpdateCheckpointDiagnosticScope(); + + try + { + return _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken); + } + catch (Exception ex) + { + // In case of failure, there is no need to call the error handler because the exception can + // be thrown directly to the caller here. + + scope.Failed(ex); + throw; + } + } /// /// Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream @@ -240,8 +260,23 @@ protected override Task UpdateCheckpointAsync(string partitionId, /// protected override Task UpdateCheckpointAsync(string partitionId, CheckpointPosition startingPosition, - CancellationToken cancellationToken) => - _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken); + CancellationToken cancellationToken) + { + using var scope = StartUpdateCheckpointDiagnosticScope(); + + try + { + return _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken); + } + catch (Exception ex) + { + // In case of failure, there is no need to call the error handler because the exception can + // be thrown directly to the caller here. + + scope.Failed(ex); + throw; + } + } /// /// Requests a list of the ownership assignments for partitions between each of the cooperating event processor diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs index db7f0397753a4..498f25d195869 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs @@ -13,9 +13,11 @@ using Azure.Core.TestFramework; using Azure.Core.Tests; using Azure.Messaging.EventHubs.Authorization; +using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Diagnostics; using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor; using Azure.Messaging.EventHubs.Producer; using Moq; using Moq.Protected; @@ -595,6 +597,43 @@ public async Task EventProcessorDisabledBatchTracing() Assert.IsEmpty(listener.Activities); } + /// + /// Verifies diagnostics functionality of the + /// method. + /// + /// + [Test] + [TestCase(false)] + [TestCase(true)] + public async Task UpdateCheckpointAsyncCreatesScope(bool useOldOverload) + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + + var mockCheckpointStore = new Mock(); + var mockProcessor = new MockCheckpointStoreProcessor(mockCheckpointStore.Object, 100, "fakeConsumer", "fakeNamespace", "fakeHub", Mock.Of()); + + using var _ = SetAppConfigSwitch(); + using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace)); + + if (useOldOverload) + { + await mockProcessor.InvokeOldUpdateCheckpointAsync("65", 12345, 67890, cancellationSource.Token); + } + else + { + await mockProcessor.InvokeUpdateCheckpointAsync("65", new CheckpointPosition(12345), cancellationSource.Token); + } + + Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); + + var checkpointActivity = listener.AssertAndRemoveActivity(DiagnosticProperty.EventProcessorCheckpointActivityName); + CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.ServerAddress, "host")); + CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.DestinationName, "hub")); + CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.MessagingSystem, DiagnosticProperty.EventHubsServiceContext)); + cancellationSource.Cancel(); + } + /// /// Asserts that the common tags are present in the activity. /// @@ -683,6 +722,70 @@ public MockEventProcessor(int identifier, protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => Task.CompletedTask; } + + /// + /// A minimal processor implementation for testing functionality + /// related to the checkpoint store integration. + /// + /// + private class MockCheckpointStoreProcessor : PluggableCheckpointStoreEventProcessor + { + public MockCheckpointStoreProcessor(CheckpointStore checkpointStore, + int eventBatchMaximumCount, + string consumerGroup, + string connectionString, + EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, connectionString, options) + { + } + + public MockCheckpointStoreProcessor(CheckpointStore checkpointStore, + int eventBatchMaximumCount, + string consumerGroup, + string connectionString, + string eventHubName, + EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) + { + } + + public MockCheckpointStoreProcessor(CheckpointStore checkpointStore, + int eventBatchMaximumCount, + string consumerGroup, + string fullyQualifiedNamespace, + string eventHubName, + AzureNamedKeyCredential credential, + EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + { + } + + public MockCheckpointStoreProcessor(CheckpointStore checkpointStore, + int eventBatchMaximumCount, + string consumerGroup, + string fullyQualifiedNamespace, + string eventHubName, + AzureSasCredential credential, + EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + { + } + + public MockCheckpointStoreProcessor(CheckpointStore checkpointStore, + int eventBatchMaximumCount, + string consumerGroup, + string fullyQualifiedNamespace, + string eventHubName, + TokenCredential credential, + EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + { + } + + protected override Task OnProcessingEventBatchAsync(IEnumerable events, EventProcessorPartition partition, CancellationToken cancellationToken) => throw new NotImplementedException(); + protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => throw new NotImplementedException(); + + public Task InvokeGetCheckpointAsync(string partitionId, CancellationToken cancellationToken) => GetCheckpointAsync(partitionId, cancellationToken); + public Task InvokeOldUpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, offset, sequenceNumber, cancellationToken); + public Task InvokeUpdateCheckpointAsync(string partitionId, CheckpointPosition checkpointPosition, CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, checkpointPosition, cancellationToken); + public Task> InvokeListOwnershipAsync(CancellationToken cancellationToken) => ListOwnershipAsync(cancellationToken); + public Task> InvokeClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) => ClaimOwnershipAsync(desiredOwnership, cancellationToken); + } } #endif } From e8f84e256ca90e03ab7f9402c40036c2ef13c110 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Tue, 11 Jun 2024 13:11:54 -0400 Subject: [PATCH 2/3] Fixing config values --- .../tests/Diagnostics/DiagnosticsActivitySourceTests.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs index 498f25d195869..1b45b52614a2a 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs @@ -610,8 +610,10 @@ public async Task UpdateCheckpointAsyncCreatesScope(bool useOldOverload) using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + var fullyQualifiedNamespace = "namespace"; + var eventHubName = "eventHub"; var mockCheckpointStore = new Mock(); - var mockProcessor = new MockCheckpointStoreProcessor(mockCheckpointStore.Object, 100, "fakeConsumer", "fakeNamespace", "fakeHub", Mock.Of()); + var mockProcessor = new MockCheckpointStoreProcessor(mockCheckpointStore.Object, 100, "fakeConsumer", fullyQualifiedNamespace, eventHubName, Mock.Of()); using var _ = SetAppConfigSwitch(); using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace)); @@ -628,8 +630,8 @@ public async Task UpdateCheckpointAsyncCreatesScope(bool useOldOverload) Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); var checkpointActivity = listener.AssertAndRemoveActivity(DiagnosticProperty.EventProcessorCheckpointActivityName); - CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.ServerAddress, "host")); - CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.DestinationName, "hub")); + CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.ServerAddress, fullyQualifiedNamespace)); + CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.DestinationName, eventHubName)); CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.MessagingSystem, DiagnosticProperty.EventHubsServiceContext)); cancellationSource.Cancel(); } From 11a0537b3fc0f9063f1ad9bfefa86341ead5d622 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Tue, 11 Jun 2024 15:27:37 -0400 Subject: [PATCH 3/3] Adding changelog --- sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md index 0ff956e403bd1..7bbf7830a6103 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md @@ -12,6 +12,8 @@ - The client will now refresh the maximum message size each time a new AMQP link is opened; this is necessary for large message support, where the maximum message size for entities can be reconfigureed adjusted on the fly. Because the client had cached the value, it would not be aware of the change and would enforce the wrong size for batch creation. +- The `PluggableCheckpointStoreEventProcessor` will now emit a diagnostic span when a checkpoint is created/updated. While this span is not defined by the Open Telemetry specification, this change aligns diagnostic spans with those emitted by `EventProcessorClient`. + ## 5.12.0-beta.1 (2024-05-17) ### Features Added