diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
index 0ff956e403bd1..7bbf7830a6103 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
@@ -12,6 +12,8 @@
- The client will now refresh the maximum message size each time a new AMQP link is opened; this is necessary for large message support, where the maximum message size for entities can be reconfigureed adjusted on the fly. Because the client had cached the value, it would not be aware of the change and would enforce the wrong size for batch creation.
+- The `PluggableCheckpointStoreEventProcessor` will now emit a diagnostic span when a checkpoint is created/updated. While this span is not defined by the Open Telemetry specification, this change aligns diagnostic spans with those emitted by `EventProcessorClient`.
+
## 5.12.0-beta.1 (2024-05-17)
### Features Added
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs
index 2a5e5edd17e64..7e5f5228e4b69 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs
@@ -588,6 +588,20 @@ public virtual void StopProcessing(CancellationToken cancellationToken = default
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => $"Event Processor<{ typeof(TPartition).Name }>: { Identifier }";
+ ///
+ /// Creates a diagnostic scope associated with a checkpointing activity.
+ ///
+ ///
+ /// The diagnostic scope. The caller is assumed to own the scope once returned and is responsible for disposing it.
+ ///
+ internal virtual DiagnosticScope StartUpdateCheckpointDiagnosticScope()
+ {
+ var scope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorCheckpointActivityName, ActivityKind.Internal);
+ scope.Start();
+
+ return scope;
+ }
+
///
/// Creates an to use for processing.
///
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs
index b3110143dcdc2..a99f56b4d82cb 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PluggableCheckpointStoreEventProcessor.cs
@@ -4,10 +4,15 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
+using Azure.Core.Shared;
+using Azure.Messaging.EventHubs.Core;
+using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Processor;
+using Microsoft.Azure.Amqp;
namespace Azure.Messaging.EventHubs.Primitives
{
@@ -227,8 +232,23 @@ protected override Task GetCheckpointAsync(string part
protected override Task UpdateCheckpointAsync(string partitionId,
long offset,
long? sequenceNumber,
- CancellationToken cancellationToken) =>
- _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);
+ CancellationToken cancellationToken)
+ {
+ using var scope = StartUpdateCheckpointDiagnosticScope();
+
+ try
+ {
+ return _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ // In case of failure, there is no need to call the error handler because the exception can
+ // be thrown directly to the caller here.
+
+ scope.Failed(ex);
+ throw;
+ }
+ }
///
/// Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream
@@ -240,8 +260,23 @@ protected override Task UpdateCheckpointAsync(string partitionId,
///
protected override Task UpdateCheckpointAsync(string partitionId,
CheckpointPosition startingPosition,
- CancellationToken cancellationToken) =>
- _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken);
+ CancellationToken cancellationToken)
+ {
+ using var scope = StartUpdateCheckpointDiagnosticScope();
+
+ try
+ {
+ return _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ // In case of failure, there is no need to call the error handler because the exception can
+ // be thrown directly to the caller here.
+
+ scope.Failed(ex);
+ throw;
+ }
+ }
///
/// Requests a list of the ownership assignments for partitions between each of the cooperating event processor
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs
index db7f0397753a4..1b45b52614a2a 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Diagnostics/DiagnosticsActivitySourceTests.cs
@@ -13,9 +13,11 @@
using Azure.Core.TestFramework;
using Azure.Core.Tests;
using Azure.Messaging.EventHubs.Authorization;
+using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Primitives;
+using Azure.Messaging.EventHubs.Processor;
using Azure.Messaging.EventHubs.Producer;
using Moq;
using Moq.Protected;
@@ -595,6 +597,45 @@ public async Task EventProcessorDisabledBatchTracing()
Assert.IsEmpty(listener.Activities);
}
+ ///
+ /// Verifies diagnostics functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ [TestCase(false)]
+ [TestCase(true)]
+ public async Task UpdateCheckpointAsyncCreatesScope(bool useOldOverload)
+ {
+ using var cancellationSource = new CancellationTokenSource();
+ cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
+
+ var fullyQualifiedNamespace = "namespace";
+ var eventHubName = "eventHub";
+ var mockCheckpointStore = new Mock();
+ var mockProcessor = new MockCheckpointStoreProcessor(mockCheckpointStore.Object, 100, "fakeConsumer", fullyQualifiedNamespace, eventHubName, Mock.Of());
+
+ using var _ = SetAppConfigSwitch();
+ using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));
+
+ if (useOldOverload)
+ {
+ await mockProcessor.InvokeOldUpdateCheckpointAsync("65", 12345, 67890, cancellationSource.Token);
+ }
+ else
+ {
+ await mockProcessor.InvokeUpdateCheckpointAsync("65", new CheckpointPosition(12345), cancellationSource.Token);
+ }
+
+ Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
+
+ var checkpointActivity = listener.AssertAndRemoveActivity(DiagnosticProperty.EventProcessorCheckpointActivityName);
+ CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.ServerAddress, fullyQualifiedNamespace));
+ CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.DestinationName, eventHubName));
+ CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair(MessagingClientDiagnostics.MessagingSystem, DiagnosticProperty.EventHubsServiceContext));
+ cancellationSource.Cancel();
+ }
+
///
/// Asserts that the common tags are present in the activity.
///
@@ -683,6 +724,70 @@ public MockEventProcessor(int identifier,
protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => Task.CompletedTask;
}
+
+ ///
+ /// A minimal processor implementation for testing functionality
+ /// related to the checkpoint store integration.
+ ///
+ ///
+ private class MockCheckpointStoreProcessor : PluggableCheckpointStoreEventProcessor
+ {
+ public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
+ int eventBatchMaximumCount,
+ string consumerGroup,
+ string connectionString,
+ EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, connectionString, options)
+ {
+ }
+
+ public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
+ int eventBatchMaximumCount,
+ string consumerGroup,
+ string connectionString,
+ string eventHubName,
+ EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
+ {
+ }
+
+ public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
+ int eventBatchMaximumCount,
+ string consumerGroup,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ AzureNamedKeyCredential credential,
+ EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
+ {
+ }
+
+ public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
+ int eventBatchMaximumCount,
+ string consumerGroup,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ AzureSasCredential credential,
+ EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
+ {
+ }
+
+ public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
+ int eventBatchMaximumCount,
+ string consumerGroup,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ TokenCredential credential,
+ EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
+ {
+ }
+
+ protected override Task OnProcessingEventBatchAsync(IEnumerable events, EventProcessorPartition partition, CancellationToken cancellationToken) => throw new NotImplementedException();
+ protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => throw new NotImplementedException();
+
+ public Task InvokeGetCheckpointAsync(string partitionId, CancellationToken cancellationToken) => GetCheckpointAsync(partitionId, cancellationToken);
+ public Task InvokeOldUpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, offset, sequenceNumber, cancellationToken);
+ public Task InvokeUpdateCheckpointAsync(string partitionId, CheckpointPosition checkpointPosition, CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, checkpointPosition, cancellationToken);
+ public Task> InvokeListOwnershipAsync(CancellationToken cancellationToken) => ListOwnershipAsync(cancellationToken);
+ public Task> InvokeClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) => ClaimOwnershipAsync(desiredOwnership, cancellationToken);
+ }
}
#endif
}