Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase unit test coverage for checkpoint store and event processor #7743

Merged
merged 3 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, Parti
*/
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
if (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null) {
if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
throw logger.logExceptionAsWarning(Exceptions
.propagate(new IllegalStateException(
"Both sequence number and offset cannot be null when updating a checkpoint")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,29 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.util.logging.ClientLogger;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import com.azure.core.util.CoreUtils;
import java.util.Map;

/**
* I18n messages loaded from the messages.properties file located within the same package.
*/
public enum Messages {
;
private static final ClientLogger LOGGER = new ClientLogger(Messages.class);
private static Properties properties;
private static final String PATH = "com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties";
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = "No metadata available for blob {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}";
public static final String FOUND_BLOB_FOR_PARTITION = "Found blob for partition {}";
public static final String BLOB_OWNER_INFO = "Blob {} is owned by {}";
public static final String CHECKPOINT_INFO = "Blob {} has checkpoint with sequence number {} and offset {}";
private static final String PATH = "messages.properties";
private static final Map<String, String> PROPERTIES = CoreUtils.getProperties(PATH);

private static synchronized Properties getProperties() {
if (properties != null) {
return properties;
}
properties = new Properties();
try (InputStream inputStream =
Thread.currentThread().getContextClassLoader().getResourceAsStream(PATH)) {
if (inputStream != null) {
properties.load(inputStream);
} else {
LOGGER.error("Message properties [{}] not found", PATH); //NON-NLS
}
} catch (IOException exception) {
LOGGER.error("Error loading message properties [{}]", PATH, exception); //NON-NLS
}
return properties;
}
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = getMessage("NO_METADATA_AVAILABLE_FOR_BLOB");
public static final String CLAIM_ERROR = getMessage("CLAIM_ERROR");
public static final String FOUND_BLOB_FOR_PARTITION = getMessage("FOUND_BLOB_FOR_PARTITION");
public static final String BLOB_OWNER_INFO = getMessage("BLOB_OWNER_INFO");
public static final String CHECKPOINT_INFO = getMessage("CHECKPOINT_INFO");

/**
* @param key the key of the message to retrieve
* @return the message matching the given key
*/
public static String getMessage(String key) {
return String.valueOf(getProperties().getOrDefault(key, key));
return PROPERTIES.getOrDefault(key, key);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.ListBlobsOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
Expand All @@ -35,6 +36,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;
Expand All @@ -61,11 +63,13 @@ public void setup() {
@Test
public void testListOwnerShip() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
BlobItem blobItem = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/ownership/0");
BlobItem blobItem2 = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem blobItem = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/ownership/0"); // valid blob
BlobItem blobItem2 = getOwnershipBlobItem("owner1", "etag", "ns/eh/cg/0"); // invalid name
BlobItem blobItem3 = new BlobItem().setName("ns/eh/cg/ownership/5"); // no metadata

PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2), null,
Arrays.asList(blobItem, blobItem2, blobItem3), null,
null)));
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);

Expand All @@ -79,6 +83,27 @@ public void testListOwnerShip() {
}).verifyComplete();
}

@Test
public void testListCheckpoint() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
BlobItem blobItem2 = new BlobItem().setName("ns/eh/cg/checkpoint/1");
PagedFlux<BlobItem> response = new PagedFlux<>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem, blobItem2), null,
null)));
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);

StepVerifier.create(blobCheckpointStore.listCheckpoints("ns", "eh", "cg"))
.assertNext(checkpoint -> {
assertEquals("0", checkpoint.getPartitionId());
assertEquals("eh", checkpoint.getEventHubName());
assertEquals("cg", checkpoint.getConsumerGroup());
assertEquals(1L, checkpoint.getSequenceNumber());
assertEquals(230L, checkpoint.getOffset());
}).verifyComplete();
}

@Test
public void testUpdateCheckpoint() {
Checkpoint checkpoint = new Checkpoint()
Expand All @@ -89,9 +114,7 @@ public void testUpdateCheckpoint() {
.setSequenceNumber(2L)
.setOffset(100L);

Map<String, String> headers = new HashMap<>();
headers.put("eTag", "etag2");
BlobItem blobItem = getBlobItem("owner1", "1", "230", "etag", "ns/eh/cg/checkpoint/0");
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem), null,
Expand All @@ -105,7 +128,44 @@ public void testUpdateCheckpoint() {
.thenReturn(Mono.empty());

BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
}

@Test
public void testInvalidCheckpoint() {
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
Assertions.assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(null));
Assertions
.assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(new Checkpoint()));
}

@Test
public void testUpdateCheckpointForNewPartition() {
Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace("ns")
.setEventHubName("eh")
.setConsumerGroup("cg")
.setPartitionId("0")
.setSequenceNumber(2L)
.setOffset(100L);

HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.put("eTag", "etag2");
BlobItem blobItem = getCheckpointBlobItem("230", "1", "ns/eh/cg/checkpoint/0");
PagedFlux<BlobItem> response = new PagedFlux<BlobItem>(() -> Mono.just(new PagedResponseBase<HttpHeaders,
BlobItem>(null, 200, null,
Arrays.asList(blobItem), null,
null)));

when(blobContainerAsyncClient.getBlobAsyncClient("ns/eh/cg/checkpoint/0")).thenReturn(blobAsyncClient);
when(blobContainerAsyncClient.listBlobs(any(ListBlobsOptions.class))).thenReturn(response);
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blobAsyncClient.exists()).thenReturn(Mono.just(false));
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
when(blockBlobAsyncClient.uploadWithResponse(ArgumentMatchers.<Flux<ByteBuffer>>any(), eq(0L),
isNull(), anyMap(), isNull(), isNull(), isNull()))
.thenReturn(Mono.just(new ResponseBase<>(null, 200, httpHeaders, null, null)));
BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
StepVerifier.create(blobCheckpointStore.updateCheckpoint(checkpoint)).verifyComplete();
}

Expand Down Expand Up @@ -195,9 +255,9 @@ private PartitionOwnership createPartitionOwnership(String fullyQualifiedNamespa
.setOwnerId(ownerId);
}

private BlobItem getBlobItem(String owner, String sequenceNumber, String offset, String etag, String blobName) {
Map<String, String> metadata = getMetadata(owner, sequenceNumber, offset);

private BlobItem getOwnershipBlobItem(String owner, String etag, String blobName) {
Map<String, String> metadata = new HashMap<>();
metadata.put("ownerid", owner);
BlobItemProperties properties = new BlobItemProperties()
.setLastModified(OffsetDateTime.now())
.setETag(etag);
Expand All @@ -208,11 +268,12 @@ private BlobItem getBlobItem(String owner, String sequenceNumber, String offset,
.setProperties(properties);
}

private Map<String, String> getMetadata(String owner, String sequenceNumber, String offset) {
private BlobItem getCheckpointBlobItem(String offset, String sequenceNumber, String blobName) {
Map<String, String> metadata = new HashMap<>();
metadata.put("ownerid", owner);
metadata.put("sequencenumber", sequenceNumber);
metadata.put("offset", offset);
return metadata;
return new BlobItem()
.setName(blobName)
.setMetadata(metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.Messages;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand All @@ -19,6 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -49,6 +51,11 @@ public class EventProcessorClient {

private final AtomicReference<Disposable> runner = new AtomicReference<>();
private final AtomicReference<Scheduler> scheduler = new AtomicReference<>();
private final String fullyQualifiedNamespace;
private final String eventHubName;
private final String consumerGroup;



/**
* Package-private constructor. Use {@link EventHubClientBuilder} to create an instance.
Expand All @@ -72,18 +79,22 @@ public class EventProcessorClient {
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Objects.requireNonNull(partitionProcessorFactory, "partitionProcessorFactory cannot be null.");

EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();

this.checkpointStore = Objects.requireNonNull(checkpointStore, "checkpointStore cannot be null");
this.identifier = UUID.randomUUID().toString();
this.fullyQualifiedNamespace = eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT);
this.eventHubName = eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT);
this.consumerGroup = consumerGroup.toLowerCase(Locale.ROOT);

logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition);
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient,
eventHubAsyncClient.getFullyQualifiedNamespace().toLowerCase(Locale.ROOT),
eventHubAsyncClient.getEventHubName().toLowerCase(Locale.ROOT),
consumerGroup.toLowerCase(Locale.ROOT), identifier, TimeUnit.MINUTES.toSeconds(1),
partitionPumpManager, processError);
this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.identifier,
TimeUnit.MINUTES.toSeconds(1), this.partitionPumpManager, processError);

}

/**
Expand Down Expand Up @@ -138,7 +149,7 @@ public synchronized void stop() {
}
runner.get().dispose();
scheduler.get().dispose();
this.partitionPumpManager.stopAllPartitionPumps();
stopProcessing();
}

/**
Expand All @@ -150,4 +161,16 @@ public synchronized void stop() {
public synchronized boolean isRunning() {
return isRunning.get();
}

private void stopProcessing() {
partitionPumpManager.stopAllPartitionPumps();

// finally, remove ownerid from checkpointstore as the processor is shutting down
checkpointStore.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroup)
.filter(ownership -> identifier.equals(ownership.getOwnerId()))
.map(ownership -> ownership.setOwnerId(""))
.collect(Collectors.toList())
.map(checkpointStore::claimOwnership)
.block(Duration.ofSeconds(10)); // block until the checkpoint store is updated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
.warning(Messages.FAILED_TO_CLAIM_OWNERSHIP, ownershipRequest.getPartitionId(),
ex.getMessage(), ex))
.collectList()
.zipWith(checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName, consumerGroupName)
.zipWhen(ownershipList -> checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName,
consumerGroupName)
.collectMap(checkpoint -> checkpoint.getPartitionId(), Function.identity()))
.subscribe(ownedPartitionCheckpointsTuple -> {
ownedPartitionCheckpointsTuple.getT1()
Expand All @@ -381,7 +382,10 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
ownedPartitionCheckpointsTuple.getT2().get(po.getPartitionId())));
},
ex -> {
throw logger.logExceptionAsError(new RuntimeException("Error while listing checkpoints", ex));
logger.warning("Error while listing checkpoints", ex);
ErrorContext errorContext = new ErrorContext(partitionAgnosticContext, ex);
processError.accept(errorContext);
throw logger.logExceptionAsError(new IllegalStateException("Error while listing checkpoints", ex));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
*/
public class EventProcessorClientErrorHandlingTest {

private static final String NAMESPACE_NAME = "dummyNamespaceName";
private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/";

@Mock
private EventHubClientBuilder eventHubClientBuilder;

Expand Down Expand Up @@ -80,7 +77,11 @@ public void testCheckpointStoreErrors(CheckpointStore checkpointStore) throws In
}, new HashMap<>());
client.start();
boolean completed = countDownLatch.await(3, TimeUnit.SECONDS);
client.stop();
try {
client.stop();
} catch (IllegalStateException ex) {
// do nothing, expected as the checkpointstores are expected to throw errors
}
Assertions.assertTrue(completed);
}

Expand Down Expand Up @@ -185,7 +186,7 @@ public Flux<PartitionOwnership> claimOwnership(
@Override
public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace,
String eventHubName, String consumerGroup) {
return null;
return Flux.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public void testWithSimplePartitionProcessor() throws Exception {
() -> testPartitionProcessor, checkpointStore, false, tracerProvider, ec -> { }, new HashMap<>());
eventProcessorClient.start();
TimeUnit.SECONDS.sleep(10);
eventProcessorClient.stop();

// Assert
assertNotNull(eventProcessorClient.getIdentifier());
Expand All @@ -168,6 +167,18 @@ public void testWithSimplePartitionProcessor() throws Exception {
verify(consumer1, atLeastOnce()).receiveFromPartition(anyString(), any(EventPosition.class),
any(ReceiveOptions.class));
verify(consumer1, atLeastOnce()).close();
eventProcessorClient.stop();
StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.assertNext(partitionOwnership -> {
assertEquals("1", partitionOwnership.getPartitionId(), "Partition");
assertEquals("test-consumer", partitionOwnership.getConsumerGroup(), "Consumer");
assertEquals("test-eh", partitionOwnership.getEventHubName(), "EventHub name");
assertEquals("", partitionOwnership.getOwnerId(), "Owner Id");
assertTrue(partitionOwnership.getLastModifiedTime() >= beforeTest, "LastModifiedTime");
assertTrue(partitionOwnership.getLastModifiedTime() <= System.currentTimeMillis(), "LastModifiedTime");
assertNotNull(partitionOwnership.getETag());
}).verifyComplete();

}

/**
Expand Down