Skip to content

Commit

Permalink
Merge pull request #3 from moarychan/moary/fix-jacoco-converage-not-met
Browse files Browse the repository at this point in the history
Add UTs for default Jacoco coverage threshold
  • Loading branch information
rujche authored Jan 6, 2022
2 parents 305d48c + 75b7812 commit 7f71fb2
Show file tree
Hide file tree
Showing 20 changed files with 599 additions and 298 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.integration.instrumentation;

import java.time.Duration;

/**
* Abstract instrumentation class.
*/
public abstract class AbstractProcessorInstrumentation<T> implements Instrumentation {

private final String name;

private final Type type;

private long lastErrorTimestamp = Long.MIN_VALUE;

private final Duration noneErrorWindow;

private T errorContext;

/**
* Construct a {@link AbstractProcessorInstrumentation} with the specified name, {@link Type} and the period of a none error window.
*
* @param name the name
* @param type the type
* @param noneErrorWindow the period of a none error window
*/
public AbstractProcessorInstrumentation(String name, Type type, Duration noneErrorWindow) {
this.name = name;
this.type = type;
this.noneErrorWindow = noneErrorWindow;
}

/**
* Get type.
*
* @return type the type
* @see Type
*/
public Type getType() {
return type;
}

/**
* Check whether is down.
*
* @return true if the status is down,false otherwise
*/
public boolean isDown() {
if (System.currentTimeMillis() > lastErrorTimestamp + noneErrorWindow.toMillis()) {
this.errorContext = null;
return false;
} else {
return true;
}
}

/**
* Check whether is up.
*
* @return false if the status is up,true otherwise
*/
public boolean isUp() {
return !isDown();
}

/**
* Mark error.
*
* @param errorContext the error context
*/
public void markError(T errorContext) {
this.errorContext = errorContext;
this.lastErrorTimestamp = System.currentTimeMillis();
}

/**
* Get error context.
*
* @return errorContext the error context
*/
public T getErrorContext() {
return errorContext;
}

/**
* Get the name of destination entity.
*
* @return name the name of destination entity
*/
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.integration.instrumentation;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public abstract class AbstractProcessorInstrumentationTests<T> {

private IllegalArgumentException exception;
private T errorContext;
private Duration window = Duration.ofSeconds(2);

public abstract T getErrorContext(RuntimeException exception);
public abstract AbstractProcessorInstrumentation<T> getProcessorInstrumentation(Instrumentation.Type type,
Duration window);

@BeforeEach
void setUp() {
exception = new IllegalArgumentException();
errorContext = getErrorContext(exception);
}

@ParameterizedTest
@EnumSource(value = Instrumentation.Type.class)
void instrumentationId(Instrumentation.Type type) {
AbstractProcessorInstrumentation<T> instrumentation = getProcessorInstrumentation(type, window);
assertEquals(type.name() + ":test", instrumentation.getId());
}

@ParameterizedTest
@EnumSource(value = Instrumentation.Type.class)
void isUp(Instrumentation.Type type) {
AbstractProcessorInstrumentation<T> instrumentation = getProcessorInstrumentation(type, window);
assertTrue(instrumentation.isUp());
}

@ParameterizedTest
@EnumSource(value = Instrumentation.Type.class)
void isDown(Instrumentation.Type type) {
AbstractProcessorInstrumentation<T> instrumentation = getProcessorInstrumentation(type, window);
instrumentation.markError(errorContext);
assertTrue(instrumentation.isDown());
sleep(1);
assertTrue(instrumentation.isDown());
sleep(1);
assertFalse(instrumentation.isDown());
}

@ParameterizedTest
@EnumSource(value = Instrumentation.Type.class)
void makeError(Instrumentation.Type type) {
AbstractProcessorInstrumentation<T> instrumentation = getProcessorInstrumentation(type, window);
instrumentation.markError(errorContext);
assertEquals(exception, instrumentation.getException());
instrumentation.markError(null);
assertNull(instrumentation.getException());
}

private void sleep(long sleep) {
try {
TimeUnit.of(ChronoUnit.SECONDS).sleep(sleep);
} catch (InterruptedException e) {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.messaging.checkpoint.Checkpointer;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.service.eventhubs.processor.BatchEventProcessingListener;
import com.azure.spring.service.eventhubs.processor.EventProcessingListener;
import com.azure.spring.service.eventhubs.processor.RecordEventProcessingListener;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class EventHubsInboundChannelAdapter extends MessageProducerSupport {
private final CheckpointConfig checkpointConfig;
private InstrumentationEventProcessingListener listener;
private EventCheckpointManager checkpointManager;
private Class<?> payloadType;

/**
* Construct a {@link EventHubsInboundChannelAdapter} with the specified {@link EventHubsProcessorContainer}, event Hub Name
Expand Down Expand Up @@ -104,9 +106,11 @@ protected void onInit() {
this.listener = recordEventProcessor;
}

if (this.payloadType != null) {
this.listener.setPayloadType(payloadType);
}
this.checkpointManager = CheckpointManagers.of(checkpointConfig, this.listenerMode);
this.processorContainer.subscribe(this.eventHubName, this.consumerGroup, this.listener);

}

@Override
Expand All @@ -124,17 +128,26 @@ protected void doStop() {
*
* @param messageConverter the message converter
*/
public void setMessageConverter(EventHubsMessageConverter messageConverter) {
public void setMessageConverter(AzureMessageConverter<EventData, EventData> messageConverter) {
this.recordEventProcessor.setMessageConverter(messageConverter);
}

/**
* Set message converter.
*
* @param messageConverter the message converter
*/
public void setBatchMessageConverter(AzureMessageConverter<EventBatchContext, EventData> messageConverter) {
this.batchEventProcessor.setMessageConverter(messageConverter);
}

/**
* Set payload Type.
*
* @param payloadType the payload Type
*/
public void setPayloadType(Class<?> payloadType) {
this.recordEventProcessor.setPayloadType(payloadType);
this.payloadType = payloadType;
}

/**
Expand Down Expand Up @@ -169,9 +182,14 @@ public void setInstrumentationId(String instrumentationId) {
private interface InstrumentationEventProcessingListener extends EventProcessingListener {
void setInstrumentationManager(InstrumentationManager instrumentationManager);
void setInstrumentationId(String instrumentationId);
void setPayloadType(Class<?> payloadType);
default void updateInstrumentation(ErrorContext errorContext,
InstrumentationManager instrumentationManager,
String instrumentationId) {
if (instrumentationManager == null) {
return;
}

Instrumentation instrumentation = instrumentationManager.getHealthInstrumentation(instrumentationId);
if (instrumentation != null) {
if (instrumentation instanceof EventHusProcessorInstrumentation) {
Expand All @@ -185,7 +203,7 @@ default void updateInstrumentation(ErrorContext errorContext,

private class IntegrationRecordEventProcessingListener implements InstrumentationEventProcessingListener, RecordEventProcessingListener {

private EventHubsMessageConverter messageConverter = new EventHubsMessageConverter();
private AzureMessageConverter<EventData, EventData> messageConverter = new EventHubsMessageConverter();
private Class<?> payloadType = byte[].class;
private InstrumentationManager instrumentationManager;
private String instrumentationId;
Expand Down Expand Up @@ -241,7 +259,7 @@ public EventHubsInitializationContextConsumer getInitializationContextConsumer()
*
* @param converter the converter
*/
public void setMessageConverter(EventHubsMessageConverter converter) {
public void setMessageConverter(AzureMessageConverter<EventData, EventData> converter) {
this.messageConverter = converter;
}

Expand All @@ -250,6 +268,7 @@ public void setMessageConverter(EventHubsMessageConverter converter) {
*
* @param payloadType the payload type
*/
@Override
public void setPayloadType(Class<?> payloadType) {
this.payloadType = payloadType;
}
Expand All @@ -267,7 +286,7 @@ public void setInstrumentationId(String instrumentationId) {

private class IntegrationBatchEventProcessingListener implements InstrumentationEventProcessingListener, BatchEventProcessingListener {

private EventHubBatchMessageConverter messageConverter = new EventHubBatchMessageConverter();
private AzureMessageConverter<EventBatchContext, EventData> messageConverter = new EventHubBatchMessageConverter();
private Class<?> payloadType = byte[].class;
private InstrumentationManager instrumentationManager;
private String instrumentationId;
Expand Down Expand Up @@ -300,7 +319,7 @@ public EventHubsInitializationContextConsumer getInitializationContextConsumer()
*
* @param converter the converter
*/
public void setMessageConverter(EventHubBatchMessageConverter converter) {
public void setMessageConverter(AzureMessageConverter<EventBatchContext, EventData> converter) {
this.messageConverter = converter;
}

Expand All @@ -309,6 +328,7 @@ public void setMessageConverter(EventHubBatchMessageConverter converter) {
*
* @param payloadType the payload type
*/
@Override
public void setPayloadType(Class<?> payloadType) {
this.payloadType = payloadType;
}
Expand Down
Loading

0 comments on commit 7f71fb2

Please sign in to comment.