diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index 97c94adff20e2..6c60cd41bf12f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -49,7 +49,8 @@ public class BlobCheckpointStore implements CheckpointStore { private static final String BLOB_PATH_SEPARATOR = "/"; private static final String CHECKPOINT_PATH = "/checkpoint/"; private static final String OWNERSHIP_PATH = "/ownership/"; - private static final ByteBuffer UPLOAD_DATA = ByteBuffer.wrap("".getBytes(UTF_8)); + public static final String EMPTY_STRING = ""; + private static final ByteBuffer UPLOAD_DATA = ByteBuffer.wrap(EMPTY_STRING.getBytes(UTF_8)); private final BlobContainerAsyncClient blobContainerAsyncClient; private final ClientLogger logger = new ClientLogger(BlobCheckpointStore.class); @@ -257,13 +258,14 @@ private Mono convertToPartitionOwnership(BlobItem blobItem) return Mono.empty(); } logger - .info(Messages.BLOB_OWNER_INFO, blobItem.getName(), blobItem.getMetadata().getOrDefault(OWNER_ID, "")); + .info(Messages.BLOB_OWNER_INFO, blobItem.getName(), + blobItem.getMetadata().getOrDefault(OWNER_ID, EMPTY_STRING)); BlobItemProperties blobProperties = blobItem.getProperties(); - String ownerId = blobItem.getMetadata().getOrDefault(OWNER_ID, ""); + String ownerId = blobItem.getMetadata().getOrDefault(OWNER_ID, EMPTY_STRING); if (ownerId == null) { - return Mono.empty(); + ownerId = EMPTY_STRING; } PartitionOwnership partitionOwnership = new PartitionOwnership() diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java index 51f560dc65add..0465b82a5f96d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java @@ -37,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.when; @@ -66,10 +67,11 @@ public void testListOwnerShip() { BlobItem blobItem = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/ownership/0"); // valid blob BlobItem blobItem2 = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/0"); // invalid name BlobItem blobItem3 = new BlobItem().setName("ns/eh/cg/ownership/5"); // no metadata + BlobItem blobItem4 = getOwnershipBlobItem(null, "2", "ns/eh/cg/ownership/2"); // valid blob with null ownerid PagedFlux response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase(null, 200, null, - Arrays.asList(blobItem, blobItem2, blobItem3), null, + Arrays.asList(blobItem, blobItem2, blobItem3, blobItem4), null, null))); when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response); @@ -80,17 +82,25 @@ public void testListOwnerShip() { assertEquals("eh", partitionOwnership.getEventHubName()); assertEquals("cg", partitionOwnership.getConsumerGroup()); assertEquals("etag", partitionOwnership.getETag()); + }) + .assertNext(partitionOwnership -> { + assertEquals("", partitionOwnership.getOwnerId()); + assertEquals("2", partitionOwnership.getPartitionId()); + assertEquals("eh", partitionOwnership.getEventHubName()); + assertEquals("cg", partitionOwnership.getConsumerGroup()); + assertEquals("2", partitionOwnership.getETag()); }).verifyComplete(); } @Test public void testListCheckpoint() { BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); - BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0"); - BlobItem blobItem2 = new BlobItem().setName("ns/eh/cg/checkpoint/1"); + BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0"); // valid blob + BlobItem blobItem2 = new BlobItem().setName("ns/eh/cg/checkpoint/1"); // valid blob with no metadata + BlobItem blobItem3 = getCheckpointBlobItem("230", "1", "ns/eh/cg/1"); // invalid name PagedFlux response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase(null, 200, null, - Arrays.asList(blobItem, blobItem2), null, + Arrays.asList(blobItem, blobItem2, blobItem3), null, null))); when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response); @@ -170,7 +180,6 @@ public void testUpdateCheckpointForNewPartition() { } - @SuppressWarnings("unchecked") @Test public void testClaimOwnership() { PartitionOwnership po = createPartitionOwnership("ns", "eh", "cg", "0", "owner1"); @@ -181,7 +190,8 @@ public void testClaimOwnership() { when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/ownership/0")).thenReturn(blobAsyncClient); when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient); when(blockBlobAsyncClient.uploadWithResponse(ArgumentMatchers.>any(), eq(0L), - isNull(), any(Map.class), isNull(), isNull(), any(BlobRequestConditions.class))) + isNull(), ArgumentMatchers.>any(), isNull(), isNull(), + any(BlobRequestConditions.class))) .thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null))); BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); @@ -195,6 +205,30 @@ public void testClaimOwnership() { }).verifyComplete(); } + @Test + public void testClaimOwnershipExistingBlob() { + PartitionOwnership po = createPartitionOwnership("ns", "eh", "cg", "0", "owner1"); + po.setETag("1"); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.put("eTag", "2"); + + when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/ownership/0")).thenReturn(blobAsyncClient); + when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient); + when(blobAsyncClient + .setMetadataWithResponse(ArgumentMatchers.>any(), any(BlobRequestConditions.class))) + .thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null))); + + BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + StepVerifier.create(blobCheckpointStore.claimOwnership(Arrays.asList(po))) + .assertNext(partitionOwnership -> { + assertEquals("owner1", partitionOwnership.getOwnerId()); + assertEquals("0", partitionOwnership.getPartitionId()); + assertEquals("eh", partitionOwnership.getEventHubName()); + assertEquals("cg", partitionOwnership.getConsumerGroup()); + assertEquals("2", partitionOwnership.getETag()); + }).verifyComplete(); + } @Test public void testListOwnershipError() { @@ -242,6 +276,19 @@ public void testFailedOwnershipClaim() { .thenReturn(Mono.error(new ResourceModifiedException("Etag did not match", null))); BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); StepVerifier.create(blobCheckpointStore.claimOwnership(Arrays.asList(po))).verifyComplete(); + + PartitionOwnership po2 = createPartitionOwnership("ns", "eh", "cg", "0", "owner1"); + po2.setETag("1"); + when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/ownership/0")).thenReturn(blobAsyncClient); + when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient); + when(blobAsyncClient + .setMetadataWithResponse(ArgumentMatchers.>any(), any(BlobRequestConditions.class))) + .thenReturn(Mono.error(new ResourceModifiedException("Etag did not match", null))); + StepVerifier.create(blobCheckpointStore.claimOwnership(Arrays.asList(po2))).verifyComplete(); + + blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + when(blobContainerAsyncClient.getBlobAsyncClient(anyString())).thenReturn(null); + StepVerifier.create(blobCheckpointStore.claimOwnership(Arrays.asList(po))).verifyComplete(); } private PartitionOwnership createPartitionOwnership(String fullyQualifiedNamespace, String eventHubName,