diff --git a/service/application/pom.xml b/service/application/pom.xml
index 04d2909a0..874a3f3ec 100644
--- a/service/application/pom.xml
+++ b/service/application/pom.xml
@@ -19,6 +19,7 @@
3.2.4
3.5.0
2.16.1
+ 4.2.0
1.4.13
@@ -290,6 +291,10 @@
org.springframework.boot
spring-boot-starter-actuator
+
+ org.awaitility
+ awaitility
+
@@ -298,12 +303,7 @@
2.2
test
-
- org.awaitility
- awaitility
- 4.2.0
- test
-
+
com.solacesystems
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java
index dbf0e4841..ea7eca24e 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java
@@ -24,6 +24,9 @@ public class EventPortalProperties {
private Boolean managed = false;
private String incomingRequestQueueName;
+ private int waitAckScanCompleteTimeoutSec = 300;
+ private int waitAckScanCompletePollIntervalSec = 10;
+
private GatewayProperties gateway
= new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of()));
}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanStatusRepository.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanStatusRepository.java
index 7d81a993c..2aa29c357 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanStatusRepository.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanStatusRepository.java
@@ -1,7 +1,10 @@
package com.solace.maas.ep.event.management.agent.repository.scan;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanStatusEntity;
+import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanTypeEntity;
import org.springframework.data.repository.CrudRepository;
public interface ScanStatusRepository extends CrudRepository {
+
+ ScanStatusEntity findByScanType(ScanTypeEntity scanType);
}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java
index b41a16823..3f88f0d5e 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java
@@ -4,9 +4,11 @@
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
+import java.util.List;
import java.util.Optional;
@Repository
public interface ScanTypeRepository extends CrudRepository {
Optional findByNameAndScanId(String name, String scanId);
+ List findAllByScanId(String scanId);
}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java
index 2f3e2d1d8..92bab79dc 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java
@@ -16,6 +16,7 @@
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanService;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
@@ -100,7 +101,7 @@ public String scan(ScanRequestBO scanRequestBO) {
).stream()
.findFirst()
.orElseThrow())
- .collect(Collectors.toUnmodifiableList());
+ .toList();
List brokerScanTypes = scanRequestBO.getScanTypes();
List routes = brokerScanTypes.stream()
@@ -111,15 +112,14 @@ public String scan(ScanRequestBO scanRequestBO) {
.filter(Objects::nonNull)
.filter(list -> !list.isEmpty())
.toList().stream()
- )
- .toList().stream().flatMap(List::stream).toList();
+ ).toList().stream().flatMap(List::stream).toList();
return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}
- public void handleError(Exception e, ScanCommandMessage message){
+ public void handleError(Exception e, ScanCommandMessage message) {
- if( scanStatusPublisherOpt.isEmpty()){
+ if (scanStatusPublisherOpt.isEmpty()) {
return;
}
ScanStatusPublisher scanStatusPublisher = scanStatusPublisherOpt.get();
@@ -140,7 +140,7 @@ public void handleError(Exception e, ScanCommandMessage message){
"orgId", orgId,
"runtimeAgentId", runtimeAgentId
);
- scanStatusPublisher.sendOverallScanStatus(response,topicVars);
+ scanStatusPublisher.sendOverallScanStatus(response, topicVars);
}
private MessagingServiceEntity retrieveMessagingServiceEntity(String messagingServiceId) {
@@ -158,4 +158,12 @@ public Page findAll(Pageable pageable) {
public Page findByMessagingServiceId(String messagingServiceId, Pageable pageable) {
return scanService.findByMessagingServiceId(messagingServiceId, pageable);
}
+
+
+ public boolean isScanComplete(String scanId) {
+ if (ObjectUtils.isEmpty(scanId)) {
+ throw new IllegalArgumentException("Scan ID cannot be null or empty");
+ }
+ return scanService.isScanComplete(scanId);
+ }
}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java
index 772dd8943..c6d1e9919 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java
@@ -26,16 +26,19 @@
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -449,4 +452,25 @@ protected RouteBundleHierarchyStore registerRouteRecipients(RouteBundle routeBun
}
return pathStore;
}
+
+ public boolean isScanComplete(String scanId) {
+ if (ObjectUtils.isEmpty(scanId)){
+ throw new IllegalArgumentException("Scan ID cannot be null or empty");
+ }
+ Set completeScanStatuses = Set.of(
+ ScanStatus.COMPLETE.name(),
+ ScanStatus.FAILED.name(),
+ ScanStatus.TIMED_OUT.name()
+ );
+
+
+ List allScanTypes = scanTypeRepository.findAllByScanId(scanId);
+ if (CollectionUtils.isEmpty(allScanTypes)) {
+ return false;
+ }
+ return allScanTypes.stream()
+ .map(scanStatusRepository::findByScanType)
+ .allMatch(status -> completeScanStatuses.contains(status.getStatus()));
+
+ }
}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java
new file mode 100644
index 000000000..28eb3a9fd
--- /dev/null
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java
@@ -0,0 +1,9 @@
+package com.solace.maas.ep.event.management.agent.subscriber;
+
+public enum PersistentMessageHandlerObserverPhase {
+ RECEIVED,
+ INITIATED,
+ COMPLETED,
+ ACKNOWLEDGED,
+ FAILED
+}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java
index d575f6a47..83b9e9aa4 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java
@@ -3,17 +3,20 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants;
import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.MessageProcessor;
+import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import com.solace.messaging.MessagingService;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@@ -32,9 +35,15 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp
private final Map messageProcessorsByClassType;
private final MessagingService messagingService;
private final EventPortalProperties eventPortalProperties;
+ private final ThreadPoolTaskExecutor executor;
@Getter
+ @SuppressWarnings("PMD.MutableStaticState")
private PersistentMessageReceiver persistentMessageReceiver;
+ // only used for testing
+ @Setter
+ private SolacePersistentMessageHandlerObserver messageHandlerObserver;
+
protected SolacePersistentMessageHandler(MessagingService messagingService,
EventPortalProperties eventPortalProperties,
List messageProcessorList) {
@@ -44,22 +53,38 @@ protected SolacePersistentMessageHandler(MessagingService messagingService,
this.eventPortalProperties = eventPortalProperties;
messageProcessorsByClassType = messageProcessorList.stream()
.collect(Collectors.toMap(MessageProcessor::supportedClass, Function.identity()));
+ executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
+ executor.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
+ executor.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize());
+ executor.setThreadNamePrefix("solace-persistent-message-handler-pool-");
+ executor.setTaskDecorator(new MdcTaskDecorator());
+ executor.initialize();
+ }
+
+ private void notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase phase, InboundMessage inboundMessage) {
+ if (messageHandlerObserver != null) {
+ messageHandlerObserver.onPhaseChange(inboundMessage, phase);
+ }
}
@Override
public void onMessage(InboundMessage inboundMessage) {
+ notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.RECEIVED,inboundMessage);
+ executor.submit(() -> processMessage(inboundMessage));
+ }
+
+
+ private void processMessage(InboundMessage inboundMessage) {
+ notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.INITIATED,inboundMessage);
String mopMessageSubclass = "";
MessageProcessor processor = null;
Object message = null;
try {
mopMessageSubclass = inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER);
String messageAsString = inboundMessage.getPayloadAsString();
- Class messageClass = cachedJSONDecoders.get(mopMessageSubclass);
- if (messageClass == null) {
- messageClass = Class.forName(mopMessageSubclass);
- cachedJSONDecoders.put(mopMessageSubclass, messageClass);
- }
+ Class messageClass = cachedJSONDecoders.computeIfAbsent(mopMessageSubclass, this::loadClass);
processor = messageProcessorsByClassType.get(messageClass);
if (processor == null) {
throw new UnsupportedOperationException("Could not find message processor for message of class " + messageClass.getCanonicalName());
@@ -68,21 +93,40 @@ public void onMessage(InboundMessage inboundMessage) {
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
message = toMessage(messageAsString, messageClass);
processor.processMessage(processor.castToMessageClass(message));
-
+ notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.COMPLETED,inboundMessage);
} catch (Exception e) {
- if (processor != null && message != null) {
- log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass);
- try {
- processor.onFailure(e, processor.castToMessageClass(message));
- } catch (Exception e1) {
- log.error("error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e);
- }
-
- } else {
- log.error("Unsupported message and/or processor encountered. Skipping processing", e);
+ handleProcessingError(mopMessageSubclass, processor, message, e);
+ notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.FAILED,inboundMessage);
+ } finally {
+ acknowledgeMessage(inboundMessage);
+ notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.ACKNOWLEDGED,inboundMessage);
+ }
+ }
+
+ private Class loadClass(String className) {
+ try {
+ return Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ log.error("Failed to load class: {}", className, e);
+ throw new RuntimeException("Failed to load class: " + className, e);
+ }
+ }
+
+ private void handleProcessingError(String mopMessageSubclass, MessageProcessor processor, Object message, Exception e) {
+ if (processor != null && message != null) {
+ log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass, e);
+ try {
+ processor.onFailure(e, processor.castToMessageClass(message));
+ } catch (Exception e1) {
+ log.error("Error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e1);
}
+ } else {
+ log.error("Unsupported message and/or processor encountered. Skipping processing", e);
+ }
+ }
- } finally {
+ private void acknowledgeMessage(InboundMessage inboundMessage) {
+ synchronized (persistentMessageReceiver) {
persistentMessageReceiver.ack(inboundMessage);
}
}
@@ -103,6 +147,7 @@ private Queue determineQueue() {
return Queue.durableNonExclusiveQueue(eventPortalProperties.getIncomingRequestQueueName());
}
+
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Queue queue = determineQueue();
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java
new file mode 100644
index 000000000..80ab07e0b
--- /dev/null
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java
@@ -0,0 +1,15 @@
+package com.solace.maas.ep.event.management.agent.subscriber;
+
+import com.solace.messaging.receiver.InboundMessage;
+
+/**
+ * The SolacePersistentMessageHandlerObserver interface defines methods to observe
+ * the lifecycle of messages handled by a Solace message handler. Implementers can
+ * use this interface to react to various stages of message processing.
+ * Primary use case is for testing purposes.
+ */
+public interface SolacePersistentMessageHandlerObserver {
+
+ void onPhaseChange(InboundMessage message, PersistentMessageHandlerObserverPhase phase);
+
+}
diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java
index 7bcd5995e..6d7f80307 100644
--- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java
+++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java
@@ -1,12 +1,14 @@
package com.solace.maas.ep.event.management.agent.subscriber.messageProcessors;
import com.solace.maas.ep.common.messages.ScanCommandMessage;
+import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
+import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@@ -14,9 +16,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_RECEIVED;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
@Slf4j
@Component
@@ -27,11 +30,14 @@ public class ScanCommandMessageProcessor implements MessageProcessor {
+ try {
+ log.debug("Checking if scan with id {} is completed", scanId);
+ return waitUntilScanIsCompleted(scanId);
+ } catch (Exception e) {
+ log.error("Error while waiting for scan to complete", e);
+ return false;
+ }
+ });
+ } catch (ConditionTimeoutException e) {
+ // Handle the timeout scenario as needed
+ log.error("Scan with id {} did not complete within the expected time", scanId);
+ }
+ }
+
+ private boolean waitUntilScanIsCompleted(String scanId) {
+ return scanManager.isScanComplete(scanId);
}
@Override
@@ -92,6 +128,6 @@ public ScanCommandMessage castToMessageClass(Object message) {
@Override
public void onFailure(Exception e, ScanCommandMessage message) {
- scanManager.handleError(e,message);
+ scanManager.handleError(e, message);
}
}
diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java
index ed5e46e66..cb9803608 100644
--- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java
+++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java
@@ -239,5 +239,11 @@ private List getKafkaRoutes(List destinations, String
);
}
+ @Test
+ void testScanStatusIsScanCompleteInvalidArgument() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> scanManager.isScanComplete(null));
+ Assertions.assertThrows(IllegalArgumentException.class, () -> scanManager.isScanComplete(""));
+ }
+
}
diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java
index e7a810b1a..fe84eba03 100644
--- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java
+++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java
@@ -27,6 +27,8 @@
import org.apache.camel.ProducerTemplate;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@@ -339,4 +341,79 @@ public void testFindById() {
assertThatNoException();
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"COMPLETE", "FAILED", "TIMED_OUT"})
+ @SneakyThrows
+ public void testScanStatusCompletionWithSingleScan(String scanStatus) {
+ ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", scanStatus);
+ ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA);
+
+ when(scanTypeRepository.findAllByScanId(any(String.class)))
+ .thenReturn(List.of(scanTypeA));
+ when(scanStatusRepository.findByScanType(any(ScanTypeEntity.class)))
+ .thenReturn(scanStatusA);
+
+ Assertions.assertTrue(scanService.isScanComplete("scan1"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"IN_PROGRESS", "INITIATED"})
+ @SneakyThrows
+ public void testScanStatusIncompleteWithSingleScan(String scanStatus) {
+ ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", scanStatus);
+ ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA);
+
+ when(scanTypeRepository.findAllByScanId(any(String.class)))
+ .thenReturn(List.of(scanTypeA));
+ when(scanStatusRepository.findByScanType(any(ScanTypeEntity.class)))
+ .thenReturn(scanStatusA);
+
+ Assertions.assertFalse(scanService.isScanComplete("scan1"));
+ }
+
+
+
+ @Test
+ @SneakyThrows
+ public void testScanStatusCompletedWithMultipleScans(){
+ ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE");
+ ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA);
+
+ ScanStatusEntity scanStatusB = scanServiceHelper.buildScanStatusEntity("status2", "COMPLETE");
+ ScanTypeEntity scanTypeB = scanServiceHelper.buildScanTypeEntity("124", "queueListing", null, scanStatusB);
+
+ when(scanTypeRepository.findAllByScanId(any(String.class)))
+ .thenReturn(List.of(scanTypeA,scanTypeB));
+ when(scanStatusRepository.findByScanType(scanTypeA))
+ .thenReturn(scanStatusA);
+ when(scanStatusRepository.findByScanType(scanTypeB))
+ .thenReturn(scanStatusB);
+
+ Assertions.assertTrue(scanService.isScanComplete("scan1"));
+ }
+
+ @Test
+ @SneakyThrows
+ public void testScanStatusNotCompleteWithMultipleScans() {
+ ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE");
+ ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA);
+
+ ScanStatusEntity scanStatusB = scanServiceHelper.buildScanStatusEntity("status2", "IN_PROGRESS");
+ ScanTypeEntity scanTypeB = scanServiceHelper.buildScanTypeEntity("124", "queueConfiguration", null, scanStatusB);
+
+ when(scanTypeRepository.findAllByScanId(any(String.class)))
+ .thenReturn(List.of(scanTypeA,scanTypeB));
+ when(scanStatusRepository.findByScanType(scanTypeA))
+ .thenReturn(scanStatusA);
+ when(scanStatusRepository.findByScanType(scanTypeB))
+ .thenReturn(scanStatusB);
+ Assertions.assertFalse(scanService.isScanComplete("scan1"));
+ }
+
+ @Test
+ void testScanServiceIsScanCompleteInvalidArgument() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> scanService.isScanComplete(null));
+ Assertions.assertThrows(IllegalArgumentException.class, () -> scanService.isScanComplete(""));
+ }
}
diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java
index a3e3d32a4..0d39e0887 100644
--- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java
+++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java
@@ -18,6 +18,7 @@
import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor;
import com.solace.messaging.MessagingService;
import com.solace.messaging.receiver.InboundMessage;
+import com.solace.messaging.receiver.PersistentMessageReceiver;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -33,19 +34,28 @@
import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL;
import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL;
import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for {@link SolacePersistentMessageHandler} correct dispatch of messages to appropriate processors
+ */
@ActiveProfiles("TEST")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
"eventPortal.gateway.messaging.standalone=false",
"eventPortal.managed=true",
- "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123"
+ "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123",
+ "eventPortal.waitAckScanCompletePollIntervalSec=1",
+ "eventPortal.waitAckScanCompleteTimeoutSec=10",
+ "eventPortal.commandThreadPoolMinSize=5",
+ "eventPortal.commandThreadPoolMaxSize=10",
+ "eventPortal.commandThreadPoolQueueSize=20"
})
@Slf4j
public class PersistentMessageHandlerTests {
@@ -53,6 +63,9 @@ public class PersistentMessageHandlerTests {
@MockBean
private ScanManager scanManager;
+ @MockBean
+ private PersistentMessageReceiver persistentMessageReceiver;
+
@Autowired
private MessagingService messagingService;
@@ -75,6 +88,8 @@ public class PersistentMessageHandlerTests {
private ListAppender listAppender;
+ private TestingSupportSolacePersistentMessageHandlerObserver messageHandlerObserver;
+
@BeforeEach
void setup() {
Logger scanLogger = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class);
@@ -82,29 +97,8 @@ void setup() {
listAppender.start();
scanLogger.addAppender(listAppender);
inboundMessage = mock(InboundMessage.class);
- }
-
- @Test
- void testScanCommandMessageHandler() {
- ScanCommandMessage scanCommandMessage =
- new ScanCommandMessage("messagingServiceId",
- "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
- when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
- when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
- ScanCommandMessage.class.getCanonicalName()
- );
-
- solacePersistentMessageHandler.onMessage(inboundMessage);
-
- verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any());
- verify(scanCommandMessageProcessor, times(1)).processMessage(any());
-
- // There must be no interaction with commandMessageProcessor
- verify(commandMessageProcessor, times(0)).castToMessageClass(any());
- verify(commandMessageProcessor, times(0)).processMessage(any());
-
-
- verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
+ messageHandlerObserver = new TestingSupportSolacePersistentMessageHandlerObserver();
+ solacePersistentMessageHandler.setMessageHandlerObserver(messageHandlerObserver);
}
@Test
@@ -130,6 +124,8 @@ void testCommandMessageHandler() {
);
solacePersistentMessageHandler.onMessage(inboundMessage);
+ // Wait for the executor to process the message
+ await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage));
verify(commandMessageProcessor, times(1)).castToMessageClass(any());
verify(commandMessageProcessor, times(1)).processMessage(any());
@@ -137,9 +133,27 @@ void testCommandMessageHandler() {
// There must be no interaction with scanCommandMessageProcessor
verify(scanCommandMessageProcessor, times(0)).castToMessageClass(any());
verify(scanCommandMessageProcessor, times(0)).processMessage(any());
+ }
+
+ @Test
+ void testScanCommandMessageHandler() {
+ ScanCommandMessage scanCommandMessage =
+ new ScanCommandMessage("messagingServiceId",
+ "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
+ when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
+ when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
+ ScanCommandMessage.class.getCanonicalName()
+ );
+ solacePersistentMessageHandler.onMessage(inboundMessage);
+ await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage));
- verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
+ verify(commandMessageProcessor, times(0)).castToMessageClass(any());
+ verify(commandMessageProcessor, times(0)).processMessage(any());
+
+ // There must be an interaction with scanCommandMessageProcessor
+ verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any());
+ verify(scanCommandMessageProcessor, times(1)).processMessage(any());
}
@Test
@@ -151,6 +165,12 @@ void testUnsupportedMessageHandling() {
ScanDataImportMessage.class.getCanonicalName()
);
solacePersistentMessageHandler.onMessage(inboundMessage);
+ await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasFailedMessage(inboundMessage));
+ assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isFalse();
+ assertThat(messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)).isTrue();
+
List logs = listAppender.list;
assertThat(logs.get(logs.size() - 1).getFormattedMessage()).isEqualTo("Unsupported message and/or processor encountered. Skipping processing");
verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
@@ -159,33 +179,6 @@ void testUnsupportedMessageHandling() {
verify(commandMessageProcessor, times(0)).onFailure(any(), any());
}
-
- @Test
- void testMessageAcknowledgementWhenProcessingError() {
- ScanCommandMessage scanCommandMessage =
- new ScanCommandMessage("messagingServiceId",
- "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
- when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
-
- doThrow(new IllegalArgumentException("Test processing error msg")).when(scanCommandMessageProcessor).processMessage(scanCommandMessage);
- when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
- ScanCommandMessage.class.getCanonicalName()
- );
-
- solacePersistentMessageHandler.onMessage(inboundMessage);
- List logs = listAppender.list;
- assertThat(logs.get(logs.size() - 1).getFormattedMessage())
- .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: " + ScanCommandMessage.class.getCanonicalName());
- verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
-
- // scan command message processor MUST handle the exception
- verify(scanCommandMessageProcessor, times(1)).onFailure(any(), any());
-
- //commandMessageProcessor MUST do nothing (not a config push command)
- verify(commandMessageProcessor, times(0)).onFailure(any(), any());
- }
-
-
private String jsonString(Object object) {
try {
return objectMapper.writeValueAsString(object);
diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java
new file mode 100644
index 000000000..377352f1a
--- /dev/null
+++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java
@@ -0,0 +1,233 @@
+package com.solace.maas.ep.event.management.agent.subscriber;
+
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.solace.maas.ep.common.messages.ScanCommandMessage;
+import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
+import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants;
+import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
+import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor;
+import com.solace.messaging.MessagingService;
+import com.solace.messaging.receiver.InboundMessage;
+import com.solace.messaging.receiver.PersistentMessageReceiver;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.test.context.ActiveProfiles;
+
+import java.util.List;
+
+import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL;
+import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ActiveProfiles("TEST")
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
+ "eventPortal.gateway.messaging.standalone=false",
+ "eventPortal.managed=true",
+ "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123",
+ "eventPortal.waitAckScanCompletePollIntervalSec=1",
+ "eventPortal.waitAckScanCompleteTimeoutSec=10",
+
+
+})
+@Slf4j
+class ScanJobPersistentMessageHandlerTests {
+
+ @MockBean
+ private ScanManager scanManager;
+
+ @MockBean
+ private PersistentMessageReceiver persistentMessageReceiver;
+
+ @Autowired
+ private MessagingService messagingService;
+
+ @SpyBean
+ private ScanCommandMessageProcessor scanCommandMessageProcessor;
+
+ @Autowired
+ private EventPortalProperties eventPortalProperties;
+
+ @SpyBean
+ private SolacePersistentMessageHandler solacePersistentMessageHandler;
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ private InboundMessage inboundMessage;
+
+ private ListAppender listAppenderPersistentMessageHandler;
+ private ListAppender listAppendersCanCommandMessageProcessor;
+
+ private TestingSupportSolacePersistentMessageHandlerObserver messageHandlerObserver;
+
+ @BeforeEach
+ void setup() {
+ Logger loggerPersistentMessageHandler = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class);
+ listAppenderPersistentMessageHandler = new ListAppender<>();
+ listAppenderPersistentMessageHandler.start();
+ loggerPersistentMessageHandler.addAppender(listAppenderPersistentMessageHandler);
+
+ Logger loggerScanCommandMessageProcessor = (Logger) LoggerFactory.getLogger(ScanCommandMessageProcessor.class);
+ listAppendersCanCommandMessageProcessor = new ListAppender<>();
+ listAppendersCanCommandMessageProcessor.start();
+ loggerScanCommandMessageProcessor.addAppender(listAppendersCanCommandMessageProcessor);
+
+ inboundMessage = mock(InboundMessage.class);
+ messageHandlerObserver = new TestingSupportSolacePersistentMessageHandlerObserver();
+ solacePersistentMessageHandler.setMessageHandlerObserver(messageHandlerObserver);
+ }
+
+ // Test that the message handler is able to process a scan command message without an observer,
+ // which will be the case when EMA is executed and not as unit / it test
+ @Test
+ void testPersistentMessageHandlerScanCommandMsgAckedWithoutObserver() {
+ solacePersistentMessageHandler.setMessageHandlerObserver(null);
+ ScanCommandMessage scanCommandMessage =
+ new ScanCommandMessage("messagingServiceId",
+ "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
+ when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
+ when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
+ ScanCommandMessage.class.getCanonicalName()
+ );
+ when(scanManager.scan(any())).thenReturn("scanId");
+ when(scanManager.isScanComplete("scanId")).thenReturn(true);
+ solacePersistentMessageHandler.onMessage(inboundMessage);
+ // we have to wait now for a second as there is no observer being notified
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ verify(scanCommandMessageProcessor, times(1)).processMessage(any());
+
+
+ }
+
+ @Test
+ void testPersistentMessageHandlerScanCommandMsgAcked() {
+ ScanCommandMessage scanCommandMessage =
+ new ScanCommandMessage("messagingServiceId",
+ "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
+ when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
+ when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
+ ScanCommandMessage.class.getCanonicalName()
+ );
+ when(scanManager.scan(any())).thenReturn("scanId");
+ when(scanManager.isScanComplete("scanId")).thenReturn(true);
+ solacePersistentMessageHandler.onMessage(inboundMessage);
+
+ //happy path - the message should be processed and acked
+ await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage));
+
+ assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isFalse();
+
+ // the scan command message processor should be called once by the persistent message handler
+ verify(scanCommandMessageProcessor, times(1)).processMessage(any());
+ // if the EMA is managed, the waitForScanCompletion method should be called
+ verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any());
+ // the message should be acked after the scan is complete
+ verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
+ }
+
+ @Test
+ void testPersistentMessageHandlerScanCommandTimeoutMsgAcked() {
+ ScanCommandMessage scanCommandMessage =
+ new ScanCommandMessage("messagingServiceId",
+ "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
+ when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
+ when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
+ ScanCommandMessage.class.getCanonicalName()
+ );
+ when(scanManager.scan(any())).thenReturn("scanId");
+ // the scan is not complete and will never be ;-)
+ // the waitForScanCompletion method will throw an exception after the timeout
+ when(scanManager.isScanComplete("scanId")).thenReturn(false);
+ solacePersistentMessageHandler.onMessage(inboundMessage);
+ // sleep for a while to allow the scan complete poll interval to pass
+
+ await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+5, SECONDS).until(()
+ -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage));
+ assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue();
+ // timeout should be logged but the message should be still acked
+ // timeout error handling is ultimately the responsibility of Event Portal
+ assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isFalse();
+ assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isTrue();
+
+ // the scan command message processor should be called once by the persistent message handler
+ verify(scanCommandMessageProcessor, times(1)).processMessage(any());
+ // if the EMA is managed, the waitForScanCompletion method should be called
+ verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any());
+ // the message should be acked after the scan is complete, even though it is timed out
+ verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
+ List logs = listAppendersCanCommandMessageProcessor.list;
+ assertThat(logs.get(logs.size() - 1).getFormattedMessage())
+ .isEqualTo("Scan with id scanId did not complete within the expected time");
+ }
+
+ @Test
+ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() {
+ ScanCommandMessage scanCommandMessage =
+ new ScanCommandMessage("messagingServiceId",
+ "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL));
+ when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage));
+ when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn(
+ ScanCommandMessage.class.getCanonicalName()
+ );
+ when(scanManager.scan(any())).thenThrow(new RuntimeException("Test exception thrown on purpose"));
+ solacePersistentMessageHandler.onMessage(inboundMessage);
+ // sleep for a while to allow the scan complete poll interval to pass
+ await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+2, SECONDS).until(()
+ -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage));
+ assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue();
+ // timeout should be logged but the message should be still acked
+ // timeout error handling is ultimately the responsibility of Event Portal
+ assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isTrue();
+ assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isFalse();
+
+ List logs = listAppenderPersistentMessageHandler.list;
+ assertThat(logs.get(logs.size() - 1).getFormattedMessage())
+ .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: "
+ + ScanCommandMessage.class.getCanonicalName());
+
+ // the scan command message processor should be called once by the persistent message handler
+ verify(scanCommandMessageProcessor, times(1)).processMessage(any());
+ verify(scanCommandMessageProcessor, atLeastOnce()).onFailure(any(), any());
+ // the message should be acked after the scan is complete
+ verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage);
+ }
+
+
+
+ private String jsonString(Object object) {
+ try {
+ return objectMapper.writeValueAsString(object);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+}
diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java
new file mode 100644
index 000000000..85b5b6c53
--- /dev/null
+++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java
@@ -0,0 +1,61 @@
+package com.solace.maas.ep.event.management.agent.subscriber;
+
+import com.solace.messaging.receiver.InboundMessage;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestingSupportSolacePersistentMessageHandlerObserver implements SolacePersistentMessageHandlerObserver {
+
+
+ private final Set receivedMessages = Collections.synchronizedSet(new HashSet<>());
+ private final Set initiatedMessages = Collections.synchronizedSet(new HashSet<>());
+ private final Set completedMessages = Collections.synchronizedSet(new HashSet<>());
+ private final Set acknowledgedMessages = Collections.synchronizedSet(new HashSet<>());
+ private final Set failedMessages =Collections.synchronizedSet(new HashSet<>());
+
+ @Override
+ public void onPhaseChange(InboundMessage message, PersistentMessageHandlerObserverPhase phase) {
+ switch (phase) {
+ case RECEIVED:
+ receivedMessages.add(message);
+ break;
+ case INITIATED:
+ initiatedMessages.add(message);
+ break;
+ case COMPLETED:
+ completedMessages.add(message);
+ break;
+ case ACKNOWLEDGED:
+ acknowledgedMessages.add(message);
+ break;
+ case FAILED:
+ failedMessages.add(message);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public boolean hasReceivedMessage(InboundMessage message) {
+ return receivedMessages.contains(message);
+ }
+
+ public boolean hasInitiatedMessageProcessing(InboundMessage message) {
+ return initiatedMessages.contains(message);
+ }
+
+ public boolean hasCompletedMessageProcessing(InboundMessage message) {
+ return completedMessages.contains(message);
+ }
+
+ public boolean hasAcknowledgedMessage(InboundMessage message) {
+ return acknowledgedMessages.contains(message);
+ }
+
+ public boolean hasFailedMessage(InboundMessage message) {
+ return failedMessages.contains(message);
+ }
+
+}