Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] UpdateCheckpointAsync API tweaks #41829

Merged
merged 7 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
public override System.Threading.Tasks.Task StopProcessingAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition checkpointStartingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down Expand Up @@ -72,8 +71,7 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition checkpointStartingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -896,35 +896,56 @@ protected override async Task ValidateProcessingPreconditions(CancellationToken
/// <param name="sequenceNumber">The sequence number to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
[EditorBrowsable(EditorBrowsableState.Never)]
protected override Task UpdateCheckpointAsync(string partitionId,
long offset,
long? sequenceNumber,
CancellationToken cancellationToken) => UpdateCheckpointAsync(partitionId, new CheckpointPosition(sequenceNumber ?? long.MinValue, offset), cancellationToken);
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Argument.AssertNotNull(partitionId, nameof(partitionId));

Logger.UpdateCheckpointStart(partitionId, Identifier, EventHubName, ConsumerGroup);

using var scope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorCheckpointActivityName, ActivityKind.Internal);
scope.Start();

try
{
return CheckpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, offset, sequenceNumber, cancellationToken);
}
catch (Exception ex)
{
// In case of failure, there is no need to call the error handler because the exception can
// be thrown directly to the caller here.

scope.Failed(ex);
Logger.UpdateCheckpointError(partitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);

throw;
}
finally
{
Logger.UpdateCheckpointComplete(partitionId, Identifier, EventHubName, ConsumerGroup);
}
}

/// <summary>
/// Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream
/// that an event processor should begin reading from.
/// </summary>
///
/// <param name="partitionId">The identifier of the partition the checkpoint is for.</param>
/// <param name="checkpointStartingPosition">The starting position to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="startingPosition">The starting position to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
protected override Task UpdateCheckpointAsync(string partitionId,
CheckpointPosition checkpointStartingPosition,
CheckpointPosition startingPosition,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

Argument.AssertNotNull(partitionId, nameof(partitionId));

// The default value for sequence number is long.MinValue to provide backwards compatibility. Ensure that if one was not provided, an offset was provided.
if (checkpointStartingPosition.SequenceNumber == long.MinValue)
{
Argument.AssertNotNull(checkpointStartingPosition.Offset, nameof(checkpointStartingPosition.Offset));
Argument.AssertInRange(checkpointStartingPosition.Offset.Value, long.MinValue + 1, long.MaxValue, nameof(checkpointStartingPosition.Offset));
}
Argument.AssertAtLeast(startingPosition.SequenceNumber, 0, nameof(startingPosition.SequenceNumber));

Logger.UpdateCheckpointStart(partitionId, Identifier, EventHubName, ConsumerGroup);

Expand All @@ -933,7 +954,7 @@ protected override Task UpdateCheckpointAsync(string partitionId,

try
{
return CheckpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, checkpointStartingPosition, cancellationToken);
return CheckpointStore.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partitionId, Identifier, startingPosition, cancellationToken);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public override Task<EventProcessorCheckpoint> GetCheckpointAsync(string fullyQu
/// <param name="sequenceNumber">The sequence number to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
[EditorBrowsable(EditorBrowsableState.Never)]
public override Task UpdateCheckpointAsync(string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
Expand All @@ -125,15 +124,15 @@ public override Task UpdateCheckpointAsync(string fullyQualifiedNamespace,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The identifier of the partition the checkpoint is for.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="checkpointStartingPosition">The starting position to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="startingPosition">The starting position to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
public override Task UpdateCheckpointAsync(string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string partitionId,
string clientIdentifier,
CheckpointPosition checkpointStartingPosition,
CancellationToken cancellationToken) => _checkpointStoreImplementation.UpdateCheckpointAsync(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, clientIdentifier, checkpointStartingPosition, cancellationToken);
CheckpointPosition startingPosition,
CancellationToken cancellationToken) => _checkpointStoreImplementation.UpdateCheckpointAsync(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, clientIdentifier, startingPosition, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,10 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist()
target.Logger = mockLog.Object;

var expectedSequenceNumber = 0;
var expectedOffset = 0;

await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber, expectedOffset), CancellationToken.None);
mockLog.Verify(log => log.UpdateCheckpointStart(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedSequenceNumber, -1, expectedOffset));
mockLog.Verify(log => log.UpdateCheckpointComplete(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedSequenceNumber, -1, expectedOffset));
await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber), CancellationToken.None);
mockLog.Verify(log => log.UpdateCheckpointStart(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedSequenceNumber, -1, long.MinValue));
mockLog.Verify(log => log.UpdateCheckpointComplete(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedSequenceNumber, -1, long.MinValue));
}

/// <summary>
Expand All @@ -325,9 +324,8 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobExists()
target.Logger = mockLog.Object;

var expectedSequenceNumber = 456;
var expectedOffset = 123;
Assert.That(async () => await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber, expectedOffset), CancellationToken.None), Throws.Exception.EqualTo(expectedException));
mockLog.Verify(log => log.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedException.Message, expectedSequenceNumber, -1, expectedOffset));
Assert.That(async () => await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber), CancellationToken.None), Throws.Exception.EqualTo(expectedException));
mockLog.Verify(log => log.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedException.Message, expectedSequenceNumber, -1, long.MinValue));
}

/// <summary>
Expand All @@ -350,9 +348,8 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobDoesNotExist()
target.Logger = mockLog.Object;

var expectedSequenceNumber = 6;
var expectedOffset = 4;
Assert.That(async () => await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber, expectedOffset), CancellationToken.None), Throws.Exception.EqualTo(expectedException));
mockLog.Verify(log => log.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedException.Message, expectedSequenceNumber, -1, expectedOffset));
Assert.That(async () => await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber), CancellationToken.None), Throws.Exception.EqualTo(expectedException));
mockLog.Verify(log => log.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, expectedException.Message, expectedSequenceNumber, -1, long.MinValue));
}

/// <summary>
Expand All @@ -370,9 +367,8 @@ public void UpdateCheckpointForMissingContainerLogsCheckpointUpdateError()
target.Logger = mockLog.Object;

var expectedSequenceNumber = 999;
var expectedOffset = 999;
Assert.That(async () => await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber , expectedOffset), CancellationToken.None), Throws.InstanceOf<RequestFailedException>());
mockLog.Verify(m => m.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, ex.Message, expectedSequenceNumber, -1, expectedOffset));
Assert.That(async () => await target.UpdateCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, PartitionId, Identifier, new CheckpointPosition(expectedSequenceNumber), CancellationToken.None), Throws.InstanceOf<RequestFailedException>());
mockLog.Verify(m => m.UpdateCheckpointError(PartitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup, Identifier, ex.Message, expectedSequenceNumber, -1, long.MinValue));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async Task CheckpointStoreActivitySourceDisabled()
mockProcessor.Object.Logger = mockLogger.Object;

using var listener = new TestActivitySourceListener(DiagnosticProperty.DiagnosticNamespace);
await InvokeUpdateCheckpointAsync(mockProcessor.Object, mockContext.Object.PartitionId, 65, 998, default);
await InvokeUpdateCheckpointAsync(mockProcessor.Object, mockContext.Object.PartitionId, 998, default);

Assert.IsEmpty(listener.Activities);
}
Expand Down Expand Up @@ -112,7 +112,7 @@ public async Task UpdateCheckpointAsyncCreatesScope()
using var _ = SetAppConfigSwitch();

using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));
await InvokeUpdateCheckpointAsync(mockProcessor.Object, mockContext.Object.PartitionId, 65, 998, default);
await InvokeUpdateCheckpointAsync(mockProcessor.Object, mockContext.Object.PartitionId, 998, default);

await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token));
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
Expand All @@ -138,13 +138,12 @@ public async Task UpdateCheckpointAsyncCreatesScope()
///
private static Task InvokeUpdateCheckpointAsync(EventProcessorClient target,
string partitionId,
long offset,
long? sequenceNumber,
long sequenceNumber,
CancellationToken cancellationToken) =>
(Task)
typeof(EventProcessorClient)
.GetMethod("UpdateCheckpointAsync", BindingFlags.Instance | BindingFlags.NonPublic, new Type[] { typeof(string), typeof(CheckpointPosition), typeof(CancellationToken) })
.Invoke(target, new object[] { partitionId, new CheckpointPosition(sequenceNumber ?? long.MinValue, offset), cancellationToken });
.Invoke(target, new object[] { partitionId, new CheckpointPosition(sequenceNumber), cancellationToken });

/// <summary>
/// Sets and returns the app config switch to enable Activity Source. The switch must be disposed at the end of the test.
Expand Down
Loading
Loading