Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Add checkpoint diagnostic scope #44486

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,20 @@ public virtual void StopProcessing(CancellationToken cancellationToken = default
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => $"Event Processor<{ typeof(TPartition).Name }>: { Identifier }";

/// <summary>
/// Creates a diagnostic scope associated with a checkpointing activity.
/// </summary>
///
/// <returns>The diagnostic scope. The caller is assumed to own the scope once returned and is responsible for disposing it.</returns>
///
internal virtual DiagnosticScope StartUpdateCheckpointDiagnosticScope()
{
var scope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorCheckpointActivityName, ActivityKind.Internal);
scope.Start();

return scope;
}

/// <summary>
/// Creates an <see cref="TransportConsumer" /> to use for processing.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Shared;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Processor;
using Microsoft.Azure.Amqp;

namespace Azure.Messaging.EventHubs.Primitives
{
Expand Down Expand Up @@ -227,8 +232,23 @@ protected override Task<EventProcessorCheckpoint> GetCheckpointAsync(string part
protected override Task UpdateCheckpointAsync(string partitionId,
long offset,
long? sequenceNumber,
CancellationToken cancellationToken) =>
_checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);
CancellationToken cancellationToken)
{
using var scope = StartUpdateCheckpointDiagnosticScope();

try
{
return _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);
}
catch (Exception ex)
{
// In case of failure, there is no need to call the error handler because the exception can
// be thrown directly to the caller here.

scope.Failed(ex);
throw;
}
}

/// <summary>
/// Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream
Expand All @@ -240,8 +260,23 @@ protected override Task UpdateCheckpointAsync(string partitionId,
/// <returns></returns>
protected override Task UpdateCheckpointAsync(string partitionId,
CheckpointPosition startingPosition,
CancellationToken cancellationToken) =>
_checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken);
CancellationToken cancellationToken)
{
using var scope = StartUpdateCheckpointDiagnosticScope();

try
{
return _checkpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken);
}
catch (Exception ex)
{
// In case of failure, there is no need to call the error handler because the exception can
// be thrown directly to the caller here.

scope.Failed(ex);
throw;
}
}

/// <summary>
/// Requests a list of the ownership assignments for partitions between each of the cooperating event processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
using Azure.Core.TestFramework;
using Azure.Core.Tests;
using Azure.Messaging.EventHubs.Authorization;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Processor;
using Azure.Messaging.EventHubs.Producer;
using Moq;
using Moq.Protected;
Expand Down Expand Up @@ -595,6 +597,45 @@ public async Task EventProcessorDisabledBatchTracing()
Assert.IsEmpty(listener.Activities);
}

/// <summary>
/// Verifies diagnostics functionality of the <see cref="EventProcessorClient.UpdateCheckpointAsync" />
/// method.
/// </summary>
///
[Test]
[TestCase(false)]
[TestCase(true)]
public async Task UpdateCheckpointAsyncCreatesScope(bool useOldOverload)
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));

var fullyQualifiedNamespace = "namespace";
var eventHubName = "eventHub";
var mockCheckpointStore = new Mock<CheckpointStore>();
var mockProcessor = new MockCheckpointStoreProcessor(mockCheckpointStore.Object, 100, "fakeConsumer", fullyQualifiedNamespace, eventHubName, Mock.Of<TokenCredential>());

using var _ = SetAppConfigSwitch();
using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));

if (useOldOverload)
{
await mockProcessor.InvokeOldUpdateCheckpointAsync("65", 12345, 67890, cancellationSource.Token);
}
else
{
await mockProcessor.InvokeUpdateCheckpointAsync("65", new CheckpointPosition(12345), cancellationSource.Token);
}

Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");

var checkpointActivity = listener.AssertAndRemoveActivity(DiagnosticProperty.EventProcessorCheckpointActivityName);
CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair<string, string>(MessagingClientDiagnostics.ServerAddress, fullyQualifiedNamespace));
CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair<string, string>(MessagingClientDiagnostics.DestinationName, eventHubName));
CollectionAssert.Contains(checkpointActivity.Tags, new KeyValuePair<string, string>(MessagingClientDiagnostics.MessagingSystem, DiagnosticProperty.EventHubsServiceContext));
cancellationSource.Cancel();
}

/// <summary>
/// Asserts that the common tags are present in the activity.
/// </summary>
Expand Down Expand Up @@ -683,6 +724,70 @@ public MockEventProcessor(int identifier,

protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => Task.CompletedTask;
}

/// <summary>
/// A minimal processor implementation for testing functionality
/// related to the checkpoint store integration.
/// </summary>
///
private class MockCheckpointStoreProcessor : PluggableCheckpointStoreEventProcessor<EventProcessorPartition>
{
public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
int eventBatchMaximumCount,
string consumerGroup,
string connectionString,
EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, connectionString, options)
{
}

public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
int eventBatchMaximumCount,
string consumerGroup,
string connectionString,
string eventHubName,
EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
{
}

public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
int eventBatchMaximumCount,
string consumerGroup,
string fullyQualifiedNamespace,
string eventHubName,
AzureNamedKeyCredential credential,
EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
{
}

public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
int eventBatchMaximumCount,
string consumerGroup,
string fullyQualifiedNamespace,
string eventHubName,
AzureSasCredential credential,
EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
{
}

public MockCheckpointStoreProcessor(CheckpointStore checkpointStore,
int eventBatchMaximumCount,
string consumerGroup,
string fullyQualifiedNamespace,
string eventHubName,
TokenCredential credential,
EventProcessorOptions options = default) : base(checkpointStore, eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
{
}

protected override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, EventProcessorPartition partition, CancellationToken cancellationToken) => throw new NotImplementedException();
protected override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => throw new NotImplementedException();

public Task<EventProcessorCheckpoint> InvokeGetCheckpointAsync(string partitionId, CancellationToken cancellationToken) => GetCheckpointAsync(partitionId, cancellationToken);
public Task InvokeOldUpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, offset, sequenceNumber, cancellationToken);
public Task InvokeUpdateCheckpointAsync(string partitionId, CheckpointPosition checkpointPosition, CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, checkpointPosition, cancellationToken);
public Task<IEnumerable<EventProcessorPartitionOwnership>> InvokeListOwnershipAsync(CancellationToken cancellationToken) => ListOwnershipAsync(cancellationToken);
public Task<IEnumerable<EventProcessorPartitionOwnership>> InvokeClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership> desiredOwnership, CancellationToken cancellationToken) => ClaimOwnershipAsync(desiredOwnership, cancellationToken);
}
}
#endif
}
Loading