Skip to content

Commit

Permalink
[azeventhubs] Fixing checkpoint store race condition (#20727)
Browse files Browse the repository at this point in the history
The checkpoint store wasn't guarding against multiple owners claiming for the first time - fixing this by using IfNoneMatch

Fixes #20717
  • Loading branch information
richardpark-msft authored May 3, 2023
1 parent 745d967 commit 6dfd0cb
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 26 deletions.
1 change: 1 addition & 0 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Potential leaks for $cbs and $management when there was a partial failure. (PR#20564)
- Latest go-amqp changes have been merged in with fixes for robustness.
- Sending a message to an entity that is full will no longer retry. (PR#20722)
- Checkpoint store handles multiple initial owners properly, allowing only one through. (PR#20727)

## 0.6.0 (2023-03-07)

Expand Down
80 changes: 54 additions & 26 deletions sdk/messaging/azeventhubs/checkpoints/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func NewBlobStore(containerClient *container.Client, options *BlobStoreOptions)

// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
// the actual partitions that were claimed.
//
// If we fail to claim ownership because of another update then it will be omitted from the
// returned slice of [Ownership]'s. It is not considered an error.
func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []azeventhubs.Ownership, options *azeventhubs.ClaimOwnershipOptions) ([]azeventhubs.Ownership, error) {
var ownerships []azeventhubs.Ownership

Expand All @@ -54,13 +57,12 @@ func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []aze
if err != nil {
return nil, err
}
lastModified, etag, err := b.setMetadata(ctx, blobName, newOwnershipBlobMetadata(po), po.ETag)
lastModified, etag, err := b.setOwnershipMetadata(ctx, blobName, po)

if err != nil {
if bloberror.HasCode(err, bloberror.ConditionNotMet) {
// we can fail to claim ownership and that's okay - it's expected that clients will
// attempt to claim with whatever state they hold locally. If they fail it just means
// someone else claimed ownership before them.
if bloberror.HasCode(err,
bloberror.ConditionNotMet, // updated before we could update it
bloberror.BlobAlreadyExists) { // created before we could create it
continue
}

Expand Down Expand Up @@ -179,25 +181,28 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s
}

// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
//
// NOTE: This function doesn't attempt to prevent simultaneous checkpoint updates - ownership is assumed.
func (b *BlobStore) UpdateCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.UpdateCheckpointOptions) error {
blobName, err := nameForCheckpointBlob(checkpoint)

if err != nil {
return err
}

_, _, err = b.setMetadata(ctx, blobName, newCheckpointBlobMetadata(checkpoint), nil)
_, _, err = b.setCheckpointMetadata(ctx, blobName, checkpoint)
return err
}

func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetadata map[string]*string, etag *azcore.ETag) (*time.Time, azcore.ETag, error) {
func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, ownership azeventhubs.Ownership) (*time.Time, azcore.ETag, error) {
blobMetadata := newOwnershipBlobMetadata(ownership)
blobClient := b.cc.NewBlockBlobClient(blobName)

if etag != nil {
if ownership.ETag != nil {
setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, &blob.SetMetadataOptions{
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfMatch: etag,
IfMatch: ownership.ETag,
},
},
})
Expand All @@ -207,29 +212,52 @@ func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetada
}

return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
} else {
setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, nil)
}

if err == nil {
return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
}
uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfNoneMatch: to.Ptr(azcore.ETag("*")),
},
},
})

if !bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil, "", err
}
if err != nil {
return nil, "", err
}

// in JS they check to see if the error is BlobNotFound. If it is, then they
// do a full upload of a blob instead.
uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
})
return uploadResp.LastModified, *uploadResp.ETag, nil
}

if err != nil {
return nil, "", err
}
// setCheckpointMetadata sets the metadata for a checkpoint, falling back to creating
// the blob if it doesn't already exist.
//
// NOTE: unlike [setOwnershipMetadata] this function doesn't attempt to prevent simultaneous
// checkpoint updates - ownership is assumed.
func (b *BlobStore) setCheckpointMetadata(ctx context.Context, blobName string, checkpoint azeventhubs.Checkpoint) (*time.Time, azcore.ETag, error) {
blobMetadata := newCheckpointBlobMetadata(checkpoint)
blobClient := b.cc.NewBlockBlobClient(blobName)

setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, nil)

if err == nil {
return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
}

return uploadResp.LastModified, *uploadResp.ETag, nil
if !bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil, "", err
}

uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
})

if err != nil {
return nil, "", err
}

return uploadResp.LastModified, *uploadResp.ETag, nil
}

func nameForCheckpointBlob(a azeventhubs.Checkpoint) (string, error) {
Expand Down
121 changes: 121 additions & 0 deletions sdk/messaging/azeventhubs/checkpoints/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package checkpoints_test

import (
"context"
"fmt"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -216,6 +217,126 @@ func TestBlobStore_ListAndClaim(t *testing.T) {
require.Empty(t, claimedOwnerships)
}

func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)

// we're going to make multiple calls to the blob store but only _one_ should succeed
// since it's "first one in wins"
claimsCh := make(chan []azeventhubs.Ownership, 20)

t.Logf("Starting %d goroutines to claim ownership without an etag", cap(claimsCh))

// attempt to claim the same partition from multiple goroutines. Only _one_ of the
// goroutines should walk away thinking it claimed the partition.
for i := 0; i < cap(claimsCh); i++ {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{
{ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"},
}, nil)

if err != nil {
claimsCh <- nil
require.NoError(t, err)
} else {
claimsCh <- ownerships
}
}()
}

claimed := map[string]bool{}
numFailedClaims := 0

for i := 0; i < cap(claimsCh); i++ {
claims := <-claimsCh

if claims == nil {
numFailedClaims++
continue
}

for _, claim := range claims {
require.False(t, claimed[claim.PartitionID], fmt.Sprintf("Partition ID %s was claimed more than once", claim.PartitionID))
require.NotNil(t, claim.ETag)
claimed[claim.PartitionID] = true
}
}

require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh)))
}

func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)

// we're going to make multiple calls to the blob store but only _one_ should succeed
// since it's "first one in wins"
claimsCh := make(chan []azeventhubs.Ownership, 20)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{
{ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"},
}, nil)
require.NoError(t, err)
require.Equal(t, "0", ownerships[0].PartitionID)
require.NotNil(t, ownerships[0].ETag)

t.Logf("Starting %d goroutines to claim ownership without an etag", cap(claimsCh))

// attempt to claim the same partition from multiple goroutines. Only _one_ of the
// goroutines should walk away thinking it claimed the partition.
for i := 0; i < cap(claimsCh); i++ {
go func() {

ownerships, err := store.ClaimOwnership(ctx, ownerships, nil)

if err != nil {
claimsCh <- nil
require.NoError(t, err)
} else {
claimsCh <- ownerships
}
}()
}

claimed := map[string]bool{}
numFailedClaims := 0

for i := 0; i < cap(claimsCh); i++ {
claims := <-claimsCh

if claims == nil {
numFailedClaims++
continue
}

for _, claim := range claims {
require.False(t, claimed[claim.PartitionID], fmt.Sprintf("Partition ID %s was claimed more than once", claim.PartitionID))
require.NotNil(t, claim.ETag)
claimed[claim.PartitionID] = true
}
}

require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh)))
}

func getContainerClient(t *testing.T) struct {
ConnectionString string
ContainerName string
Expand Down
7 changes: 7 additions & 0 deletions sdk/messaging/azeventhubs/internal/test/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func CaptureLogsForTest() func() []string {

func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string {
setAzLogListener(func(e azlog.Event, s string) {
defer func() {
if err := recover(); err != nil {
fmt.Printf("FAILED SENDING MESSAGE (%s), message was: [%s] %s\n", err, e, s)
panic(err)
}
}()

messagesCh <- fmt.Sprintf("[%s] %s", e, s)
})

Expand Down

0 comments on commit 6dfd0cb

Please sign in to comment.