Skip to content

Commit

Permalink
[Event Hubs] Update value of missing offset (#43659)
Browse files Browse the repository at this point in the history
* [Event Hubs] Update value of missing offset

The focus of these changes is to update the value
written for a checkpoint when the offset is intentionally
not populated.  Previously, the offset was written as
empty, which was read by consumers as a null value.
With this change, the offset value is now written as
a string indicating that the offset is not present.

Because the checkpoint format changed and offset is
no longer populated by the EventProcessor, we need
to ensure that a value is present for the Functions
scale controller which uses a null check on the
offset to determine if a checkpoint is in the legacy
format or current.  Because GetCheckpointAsync will
only populate the offset if a long.TryParse is
successful, adding a nonsense string value to satisfy
the null check will not impact the EventProcessor behavior.
  • Loading branch information
jsquire authored Apr 26, 2024
1 parent 90faebd commit 81e2a79
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public override async Task<EventProcessorCheckpoint> GetCheckpointAsync(string f

try
{
var blobName = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()) + partitionId;
var blobName = GetCheckpointBlobName(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
var blob = await ContainerClient
.GetBlobClient(blobName)
.GetPropertiesAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -397,17 +397,53 @@ public override async Task UpdateCheckpointAsync(string fullyQualifiedNamespace,
CancellationToken cancellationToken)
=> await UpdateCheckpointInternalAsync(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, clientIdentifier, null, startingPosition.SequenceNumber, cancellationToken).ConfigureAwait(false);

/// <summary>
/// Gets the name of the Storage Blob representing the checkpoint for a given partition.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The identifier of the partition the checkpoint is for.</param>
///
/// <returns>The name of the blob.</returns>
///
internal string GetCheckpointBlobName(string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string partitionId) =>
string.Format(CultureInfo.InvariantCulture, CheckpointPrefix + partitionId, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant());

/// <summary>
/// Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream
/// that an event processor should begin reading from.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The identifier of the partition the checkpoint is for.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="offset">The offset to associate with the checkpoint. Informational, unless the <paramref name="sequenceNumber"/> has no value.</param>
/// <param name="sequenceNumber">The sequence number to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
private async Task UpdateCheckpointInternalAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, long? offset, long? sequenceNumber, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber.ToString(), "-1", offset.ToString());

var blobName = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix + partitionId, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant());
var blobName = GetCheckpointBlobName(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
var blobClient = ContainerClient.GetBlobClient(blobName);

// Because the checkpoint format changed and offset is no longer populated by the EventProcessor, we need to ensure that a value is present for
// the Functions scale controller which uses a null check on the offset to determine if a checkpoint is in the legacy format or current. Because
// GetCheckpointAsync will only populate the offset if a long.TryParse is successful, adding a nonsense string value to satisfy the null check
// will not impact the EventProcessor behavior.

var metadata = new Dictionary<string, string>()
{
{ BlobMetadataKey.Offset, offset.HasValue ? offset.Value.ToString(CultureInfo.InvariantCulture) : "" },
{ BlobMetadataKey.Offset, offset.HasValue ? offset.Value.ToString(CultureInfo.InvariantCulture) : "no offset" },
{ BlobMetadataKey.SequenceNumber, sequenceNumber.HasValue ? sequenceNumber.Value.ToString(CultureInfo.InvariantCulture) : "" },
{ BlobMetadataKey.ClientIdentifier, clientIdentifier }
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using NUnit.Framework;

namespace Azure.Messaging.EventHubs.Tests
Expand Down Expand Up @@ -779,6 +780,63 @@ public async Task CheckpointUpdateCreatesTheBlobOnFirstCall()
}
}

/// <summary>
/// Verifies functionality of the <see cref="BlobCheckpointStore.UpdateCheckpointAsync" />
/// method.
/// </summary>
///
[Test]
public async Task CheckpointUpdateWritesAnInvalidOffsetString()
{
await using (StorageScope storageScope = await StorageScope.CreateAsync())
{
var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString;
var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName);
var checkpointStore = new BlobCheckpointStoreInternal(containerClient);

var checkpoint = new EventProcessorCheckpoint
{
FullyQualifiedNamespace = "namespace",
EventHubName = "eventHubName",
ConsumerGroup = "consumerGroup",
PartitionId = "partitionId",
ClientIdentifier = "Id"
};

var mockEvent = new MockEventData(
eventBody: Array.Empty<byte>(),
offset: 10,
sequenceNumber: 20);

// Calling update should create the checkpoint.

await checkpointStore.UpdateCheckpointAsync(checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, checkpoint.PartitionId, checkpoint.ClientIdentifier, new CheckpointPosition(mockEvent.SequenceNumber), default);

var blobCount = 0;
var checkpointBlobName = default(string);
var checkpointBlob = default(BlobProperties);
var storedCheckpoint = await checkpointStore.GetCheckpointAsync(checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, checkpoint.PartitionId, default);

await foreach (var blob in containerClient.GetBlobsAsync())
{
++blobCount;
checkpointBlobName = blob.Name;
checkpointBlob = await containerClient.GetBlobClient(blob.Name).GetPropertiesAsync();

if (blobCount > 1)
{
break;
}
}

Assert.That(blobCount, Is.EqualTo(1));
Assert.That(checkpointBlobName, Is.EqualTo(checkpointStore.GetCheckpointBlobName(checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, checkpoint.PartitionId)));
Assert.That(checkpointBlob.Metadata.TryGetValue("offset", out var offset), Is.True);
Assert.That(offset, Is.Not.Null);
Assert.That(long.TryParse(offset, out _), Is.False);
}
}

/// <summary>
/// Verifies functionality of the <see cref="BlobCheckpointStore.UpdateCheckpointAsync" />
/// method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,43 @@ public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenNoOffs
var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false);
var partition = Guid.NewGuid().ToString();

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partition}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
new Dictionary<string, string>
{
{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()},
{BlobMetadataKey.Offset, ""},
{BlobMetadataKey.SequenceNumber, expectedSequence.ToString()}
})
};

var target = new BlobCheckpointStoreInternal(new MockBlobContainerClient() { Blobs = blobList });
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition, CancellationToken.None);

Assert.That(checkpoint, Is.Not.Null, "A checkpoint should have been returned.");
Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition));

Assert.That(checkpoint, Is.InstanceOf<BlobCheckpointStoreInternal.BlobStorageCheckpoint>(), "Checkpoint instance was not the expected type.");
var blobCheckpoint = (BlobCheckpointStoreInternal.BlobStorageCheckpoint)checkpoint;
Assert.That(blobCheckpoint.Offset, Is.Null, $"The offset should not have been populated, as it was not set.");
Assert.That(expectedSequence, Is.EqualTo(blobCheckpoint.SequenceNumber), "Checkpoint sequence number did not have the correct value.");
}

/// <summary>
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
/// </summary>
///
[Test]
public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenOffsetIsMinValue()
{
var expectedSequence = 133;
var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false);
var partition = Guid.NewGuid().ToString();

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partition}",
Expand All @@ -423,6 +460,44 @@ public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenNoOffs
Assert.That(expectedSequence, Is.EqualTo(blobCheckpoint.SequenceNumber), "Checkpoint sequence number did not have the correct value.");
}

/// <summary>
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
/// </summary>
///
[Test]
public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenInvalidOffsetIsPresent()
{
var expectedSequence = 133;
var expectedOffset = "invalid offset";
var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false);
var partition = Guid.NewGuid().ToString();

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partition}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
new Dictionary<string, string>
{
{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()},
{BlobMetadataKey.Offset, expectedOffset},
{BlobMetadataKey.SequenceNumber, expectedSequence.ToString()}
})
};

var target = new BlobCheckpointStoreInternal(new MockBlobContainerClient() { Blobs = blobList });
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, partition, CancellationToken.None);

Assert.That(checkpoint, Is.Not.Null, "A checkpoint should have been returned.");
Assert.That(checkpoint.StartingPosition, Is.EqualTo(expectedStartingPosition));

Assert.That(checkpoint, Is.InstanceOf<BlobCheckpointStoreInternal.BlobStorageCheckpoint>(), "Checkpoint instance was not the expected type.");
var blobCheckpoint = (BlobCheckpointStoreInternal.BlobStorageCheckpoint)checkpoint;
Assert.That(blobCheckpoint.Offset, Is.Null, $"The offset should not have been populated, as it was an invalid number.");
Assert.That(expectedSequence, Is.EqualTo(blobCheckpoint.SequenceNumber), "Checkpoint sequence number did not have the correct value.");
}

/// <summary>
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
/// </summary>
Expand Down Expand Up @@ -473,7 +548,7 @@ public async Task GetCheckpointConsidersDataInvalidWithNoOffsetOrSequenceNumber(
new Dictionary<string, string>
{
{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()},
{BlobMetadataKey.Offset, ""},
{BlobMetadataKey.Offset, "invalid"},
{BlobMetadataKey.SequenceNumber, long.MinValue.ToString()}
})
};
Expand Down

0 comments on commit 81e2a79

Please sign in to comment.