Skip to content

Commit

Permalink
Increase unit test coverage for checkpoint store and event processor (#…
Browse files Browse the repository at this point in the history
…7743)

* Checkpoint store unit tests

* Update unit tests

* Fix path for resource file
  • Loading branch information
srnagar authored Jan 27, 2020
1 parent 72ef77a commit 20e3c52
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, Parti
*/
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
if (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null) {
if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
throw logger.logExceptionAsWarning(Exceptions
.propagate(new IllegalStateException(
"Both sequence number and offset cannot be null when updating a checkpoint")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,29 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.util.logging.ClientLogger;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import com.azure.core.util.CoreUtils;
import java.util.Map;

/**
* I18n messages loaded from the messages.properties file located within the same package.
*/
public enum Messages {
;
private static final ClientLogger LOGGER = new ClientLogger(Messages.class);
private static Properties properties;
private static final String PATH = "com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties";
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = "No metadata available for blob {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}";
public static final String FOUND_BLOB_FOR_PARTITION = "Found blob for partition {}";
public static final String BLOB_OWNER_INFO = "Blob {} is owned by {}";
public static final String CHECKPOINT_INFO = "Blob {} has checkpoint with sequence number {} and offset {}";
private static final String PATH = "messages.properties";
private static final Map<String, String> PROPERTIES = CoreUtils.getProperties(PATH);

private static synchronized Properties getProperties() {
if (properties != null) {
return properties;
}
properties = new Properties();
try (InputStream inputStream =
Thread.currentThread().getContextClassLoader().getResourceAsStream(PATH)) {
if (inputStream != null) {
properties.load(inputStream);
} else {
LOGGER.error("Message properties [{}] not found", PATH); //NON-NLS
}
} catch (IOException exception) {
LOGGER.error("Error loading message properties [{}]", PATH, exception); //NON-NLS
}
return properties;
}
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = getMessage("NO_METADATA_AVAILABLE_FOR_BLOB");
public static final String CLAIM_ERROR = getMessage("CLAIM_ERROR");
public static final String FOUND_BLOB_FOR_PARTITION = getMessage("FOUND_BLOB_FOR_PARTITION");
public static final String BLOB_OWNER_INFO = getMessage("BLOB_OWNER_INFO");
public static final String CHECKPOINT_INFO = getMessage("CHECKPOINT_INFO");

/**
* @param key the key of the message to retrieve
* @return the message matching the given key
*/
public static String getMessage(String key) {
return String.valueOf(getProperties().getOrDefault(key, key));
return PROPERTIES.getOrDefault(key, key);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.ListBlobsOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
Expand All @@ -35,6 +36,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;
Expand All @@ -61,11 +63,13 @@ public void setup() {
@Test
public void testListOwnerShip() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
BlobItem blobItem = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/ownership/0");
BlobItem blobItem2 = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem blobItem = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/ownership/0"); // valid blob
BlobItem blobItem2 = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/0"); // invalid name
BlobItem blobItem3 = new BlobItem().setName("ns/eh/cg/ownership/5"); // no metadata

PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2), null,
Arrays.asList(blobItem, blobItem2, blobItem3), null,
null)));
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);

Expand All @@ -79,6 +83,27 @@ public void testListOwnerShip() {
}).verifyComplete();
}

@Test
public void testListCheckpoint() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
BlobItem blobItem2 = new BlobItem().setName("ns/eh/cg/checkpoint/1");
PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2), null,
null)));
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);

StepVerifier.create(blobCheckpointStore.listCheckpoints("ns", "eh", "cg"))
.assertNext(checkpoint -> {
assertEquals("0", checkpoint.getPartitionId());
assertEquals("eh", checkpoint.getEventHubName());
assertEquals("cg", checkpoint.getConsumerGroup());
assertEquals(1L, checkpoint.getSequenceNumber());
assertEquals(230L, checkpoint.getOffset());
}).verifyComplete();
}

@Test
public void testUpdateCheckpoint() {
Checkpoint checkpoint = new Checkpoint()
Expand All @@ -89,9 +114,7 @@ public void testUpdateCheckpoint() {
.setSequenceNumber(2L)
.setOffset(100L);

Map<String, String> headers = new HashMap<>();
headers.put("eTag", "etag2");
BlobItem blobItem = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/checkpoint/0");
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem), null,
Expand All @@ -105,7 +128,44 @@ public void testUpdateCheckpoint() {
.thenReturn(Mono.empty());

BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
}

@Test
public void testInvalidCheckpoint() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
Assertions.assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(null));
Assertions
.assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(new Checkpoint()));
}

@Test
public void testUpdateCheckpointForNewPartition() {
Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace("ns")
.setEventHubName("eh")
.setConsumerGroup("cg")
.setPartitionId("0")
.setSequenceNumber(2L)
.setOffset(100L);

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.put("eTag", "etag2");
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem), null,
null)));

when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/checkpoint/0")).thenReturn(blobAsyncClient);
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blobAsyncClient.exists()).thenReturn(Mono.just(false));
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blockBlobAsyncClient.uploadWithResponse(ArgumentMatchers.<Flux<ByteBuffer>>any(), eq(0L),
isNull(), anyMap(), isNull(), isNull(), isNull()))
.thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null)));
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
}

Expand Down Expand Up @@ -195,9 +255,9 @@ private PartitionOwnership createPartitionOwnership(String fullyQualifiedNamespa
.setOwnerId(ownerId);
}

private BlobItem getBlobItem(String owner, String sequenceNumber, String offset, String etag, String blobName) {
Map<String, String> metadata = getMetadata(owner, sequenceNumber, offset);

private BlobItem getOwnershipBlobItem(String owner, String etag, String blobName) {
Map<String, String> metadata = new HashMap<>();
metadata.put("ownerid", owner);
BlobItemProperties properties = new BlobItemProperties()
.setLastModified(OffsetDateTime.now())
.setETag(etag);
Expand All @@ -208,11 +268,12 @@ private BlobItem getBlobItem(String owner, String sequenceNumber, String offset,
.setProperties(properties);
}

private Map<String, String> getMetadata(String owner, String sequenceNumber, String offset) {
private BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName) {
Map<String, String> metadata = new HashMap<>();
metadata.put("ownerid", owner);
metadata.put("sequencenumber", sequenceNumber);
metadata.put("offset", offset);
return metadata;
return new BlobItem()
.setName(blobName)
.setMetadata(metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.Messages;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand All @@ -19,6 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -49,6 +51,11 @@ public class EventProcessorClient {

private final AtomicReference<Disposable> runner = new AtomicReference<>();
private final AtomicReference<Scheduler> scheduler = new AtomicReference<>();
private final String fullyQualifiedNamespace;
private final String eventHubName;
private final String consumerGroup;



/**
* Package-private constructor. Use {@link EventHubClientBuilder} to create an instance.
Expand All @@ -72,18 +79,22 @@ public class EventProcessorClient {
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Objects.requireNonNull(partitionProcessorFactory, "partitionProcessorFactory cannot be null.");

EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();

this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
this.identifier = UUID.randomUUID().toString();
this.fullyQualifiedNamespace = eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT);
this.eventHubName = eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT);
this.consumerGroup = consumerGroup.toLowerCase(Locale.ROOT);

logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition);
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient,
eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT),
eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT),
consumerGroup.toLowerCase(Locale.ROOT), identifier, TimeUnit.MINUTES.toSeconds(1),
partitionPumpManager, processError);
this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.identifier,
TimeUnit.MINUTES.toSeconds(1), this.partitionPumpManager, processError);

}

/**
Expand Down Expand Up @@ -138,7 +149,7 @@ public synchronized void stop() {
}
runner.get().dispose();
scheduler.get().dispose();
this.partitionPumpManager.stopAllPartitionPumps();
stopProcessing();
}

/**
Expand All @@ -150,4 +161,16 @@ public synchronized void stop() {
public synchronized boolean isRunning() {
return isRunning.get();
}

private void stopProcessing() {
partitionPumpManager.stopAllPartitionPumps();

// finally, remove ownerid from checkpointstore as the processor is shutting down
checkpointStore.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroup)
.filter(ownership -> identifier.equals(ownership.getOwnerId()))
.map(ownership -> ownership.setOwnerId(""))
.collect(Collectors.toList())
.map(checkpointStore::claimOwnership)
.block(Duration.ofSeconds(10)); // block until the checkpoint store is updated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
.warning(Messages.FAILED_TO_CLAIM_OWNERSHIP, ownershipRequest.getPartitionId(),
ex.getMessage(), ex))
.collectList()
.zipWith(checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName, consumerGroupName)
.zipWhen(ownershipList -> checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName,
consumerGroupName)
.collectMap(checkpoint -> checkpoint.getPartitionId(), Function.identity()))
.subscribe(ownedPartitionCheckpointsTuple -> {
ownedPartitionCheckpointsTuple.getT1()
Expand All @@ -381,7 +382,10 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
ownedPartitionCheckpointsTuple.getT2().get(po.getPartitionId())));
},
ex -> {
throw logger.logExceptionAsError(new RuntimeException("Error while listing checkpoints", ex));
logger.warning("Error while listing checkpoints", ex);
ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
processError.accept(errorContext);
throw logger.logExceptionAsError(new IllegalStateException("Error while listing checkpoints", ex));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
*/
public class EventProcessorClientErrorHandlingTest {

private static final String NAMESPACE_NAME = "dummyNamespaceName";
private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/";

@Mock
private EventHubClientBuilder eventHubClientBuilder;

Expand Down Expand Up @@ -80,7 +77,11 @@ public void testCheckpointStoreErrors(CheckpointStore checkpointStore) throws In
}, new HashMap<>());
client.start();
boolean completed = countDownLatch.await(3, TimeUnit.SECONDS);
client.stop();
try {
client.stop();
} catch (IllegalStateException ex) {
// do nothing, expected as the checkpointstores are expected to throw errors
}
Assertions.assertTrue(completed);
}

Expand Down Expand Up @@ -185,7 +186,7 @@ public Flux<PartitionOwnership> claimOwnership(
@Override
public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace,
String eventHubName, String consumerGroup) {
return null;
return Flux.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public void testWithSimplePartitionProcessor() throws Exception {
() -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>());
eventProcessorClient.start();
TimeUnit.SECONDS.sleep(10);
eventProcessorClient.stop();

// Assert
assertNotNull(eventProcessorClient.getIdentifier());
Expand All @@ -168,6 +167,18 @@ public void testWithSimplePartitionProcessor() throws Exception {
verify(consumer1, atLeastOnce()).receiveFromPartition(anyString(), any(EventPosition.class),
any(ReceiveOptions.class));
verify(consumer1, atLeastOnce()).close();
eventProcessorClient.stop();
StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.assertNext(partitionOwnership -> {
assertEquals("1", partitionOwnership.getPartitionId(), "Partition");
assertEquals("test-consumer", partitionOwnership.getConsumerGroup(), "Consumer");
assertEquals("test-eh", partitionOwnership.getEventHubName(), "EventHub name");
assertEquals("", partitionOwnership.getOwnerId(), "Owner Id");
assertTrue(partitionOwnership.getLastModifiedTime() >= beforeTest, "LastModifiedTime");
assertTrue(partitionOwnership.getLastModifiedTime() <= System.currentTimeMillis(), "LastModifiedTime");
assertNotNull(partitionOwnership.getETag());
}).verifyComplete();

}

/**
Expand Down

0 comments on commit 20e3c52

Please sign in to comment.