Skip to content

Commit

Permalink
DATAGO-84852: C-EMA restart resilience for scan jobs (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
mynecker authored Oct 24, 2024
1 parent d24ff15 commit 5ad78c3
Show file tree
Hide file tree
Showing 15 changed files with 599 additions and 84 deletions.
12 changes: 6 additions & 6 deletions service/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<spring-kafka.version>3.2.4</spring-kafka.version>
<kafka-clients.version>3.5.0</kafka-clients.version>
<jackson.version>2.16.1</jackson.version>
<awaitility.version>4.2.0</awaitility.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
</properties>
<repositories>
Expand Down Expand Up @@ -290,6 +291,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<!-- for testing -->
<dependency>
Expand All @@ -298,12 +303,7 @@
<version>2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.solacesystems</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class EventPortalProperties {
private Boolean managed = false;
private String incomingRequestQueueName;

private int waitAckScanCompleteTimeoutSec = 300;
private int waitAckScanCompletePollIntervalSec = 10;

private GatewayProperties gateway
= new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of()));
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.solace.maas.ep.event.management.agent.repository.scan;

import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanStatusEntity;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanTypeEntity;
import org.springframework.data.repository.CrudRepository;

public interface ScanStatusRepository extends CrudRepository<ScanStatusEntity, String> {

ScanStatusEntity findByScanType(ScanTypeEntity scanType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.Optional;

@Repository
public interface ScanTypeRepository extends CrudRepository<ScanTypeEntity, String> {
Optional<ScanTypeEntity> findByNameAndScanId(String name, String scanId);
List<ScanTypeEntity> findAllByScanId(String scanId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -100,7 +101,7 @@ public String scan(ScanRequestBO scanRequestBO) {
).stream()
.findFirst()
.orElseThrow())
.collect(Collectors.toUnmodifiableList());
.toList();

List<String> brokerScanTypes = scanRequestBO.getScanTypes();
List<RouteBundle> routes = brokerScanTypes.stream()
Expand All @@ -111,15 +112,14 @@ public String scan(ScanRequestBO scanRequestBO) {
.filter(Objects::nonNull)
.filter(list -> !list.isEmpty())
.toList().stream()
)
.toList().stream().flatMap(List::stream).toList();
).toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}

public void handleError(Exception e, ScanCommandMessage message){
public void handleError(Exception e, ScanCommandMessage message) {

if( scanStatusPublisherOpt.isEmpty()){
if (scanStatusPublisherOpt.isEmpty()) {
return;
}
ScanStatusPublisher scanStatusPublisher = scanStatusPublisherOpt.get();
Expand All @@ -140,7 +140,7 @@ public void handleError(Exception e, ScanCommandMessage message){
"orgId", orgId,
"runtimeAgentId", runtimeAgentId
);
scanStatusPublisher.sendOverallScanStatus(response,topicVars);
scanStatusPublisher.sendOverallScanStatus(response, topicVars);
}

private MessagingServiceEntity retrieveMessagingServiceEntity(String messagingServiceId) {
Expand All @@ -158,4 +158,12 @@ public Page<ScanItemBO> findAll(Pageable pageable) {
public Page<ScanItemBO> findByMessagingServiceId(String messagingServiceId, Pageable pageable) {
return scanService.findByMessagingServiceId(messagingServiceId, pageable);
}


public boolean isScanComplete(String scanId) {
if (ObjectUtils.isEmpty(scanId)) {
throw new IllegalArgumentException("Scan ID cannot be null or empty");
}
return scanService.isScanComplete(scanId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -449,4 +452,25 @@ protected RouteBundleHierarchyStore registerRouteRecipients(RouteBundle routeBun
}
return pathStore;
}

public boolean isScanComplete(String scanId) {
if (ObjectUtils.isEmpty(scanId)){
throw new IllegalArgumentException("Scan ID cannot be null or empty");
}
Set<String> completeScanStatuses = Set.of(
ScanStatus.COMPLETE.name(),
ScanStatus.FAILED.name(),
ScanStatus.TIMED_OUT.name()
);


List<ScanTypeEntity> allScanTypes = scanTypeRepository.findAllByScanId(scanId);
if (CollectionUtils.isEmpty(allScanTypes)) {
return false;
}
return allScanTypes.stream()
.map(scanStatusRepository::findByScanType)
.allMatch(status -> completeScanStatuses.contains(status.getStatus()));

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.solace.maas.ep.event.management.agent.subscriber;

public enum PersistentMessageHandlerObserverPhase {
RECEIVED,
INITIATED,
COMPLETED,
ACKNOWLEDGED,
FAILED
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants;
import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.MessageProcessor;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import com.solace.messaging.MessagingService;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.util.HashMap;
Expand All @@ -32,9 +35,15 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp
private final Map<Class, MessageProcessor> messageProcessorsByClassType;
private final MessagingService messagingService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor executor;
@Getter
@SuppressWarnings("PMD.MutableStaticState")
private PersistentMessageReceiver persistentMessageReceiver;

// only used for testing
@Setter
private SolacePersistentMessageHandlerObserver messageHandlerObserver;

protected SolacePersistentMessageHandler(MessagingService messagingService,
EventPortalProperties eventPortalProperties,
List<MessageProcessor> messageProcessorList) {
Expand All @@ -44,22 +53,38 @@ protected SolacePersistentMessageHandler(MessagingService messagingService,
this.eventPortalProperties = eventPortalProperties;
messageProcessorsByClassType = messageProcessorList.stream()
.collect(Collectors.toMap(MessageProcessor::supportedClass, Function.identity()));
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
executor.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
executor.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize());
executor.setThreadNamePrefix("solace-persistent-message-handler-pool-");
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
}


private void notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase phase, InboundMessage inboundMessage) {
if (messageHandlerObserver != null) {
messageHandlerObserver.onPhaseChange(inboundMessage, phase);
}
}

@Override
public void onMessage(InboundMessage inboundMessage) {
notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.RECEIVED,inboundMessage);
executor.submit(() -> processMessage(inboundMessage));
}


private void processMessage(InboundMessage inboundMessage) {
notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.INITIATED,inboundMessage);
String mopMessageSubclass = "";
MessageProcessor processor = null;
Object message = null;
try {
mopMessageSubclass = inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER);
String messageAsString = inboundMessage.getPayloadAsString();
Class messageClass = cachedJSONDecoders.get(mopMessageSubclass);
if (messageClass == null) {
messageClass = Class.forName(mopMessageSubclass);
cachedJSONDecoders.put(mopMessageSubclass, messageClass);
}
Class messageClass = cachedJSONDecoders.computeIfAbsent(mopMessageSubclass, this::loadClass);
processor = messageProcessorsByClassType.get(messageClass);
if (processor == null) {
throw new UnsupportedOperationException("Could not find message processor for message of class " + messageClass.getCanonicalName());
Expand All @@ -68,21 +93,40 @@ public void onMessage(InboundMessage inboundMessage) {
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
message = toMessage(messageAsString, messageClass);
processor.processMessage(processor.castToMessageClass(message));

notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.COMPLETED,inboundMessage);
} catch (Exception e) {
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass);
try {
processor.onFailure(e, processor.castToMessageClass(message));
} catch (Exception e1) {
log.error("error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e);
}

} else {
log.error("Unsupported message and/or processor encountered. Skipping processing", e);
handleProcessingError(mopMessageSubclass, processor, message, e);
notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.FAILED,inboundMessage);
} finally {
acknowledgeMessage(inboundMessage);
notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.ACKNOWLEDGED,inboundMessage);
}
}

private Class loadClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
log.error("Failed to load class: {}", className, e);
throw new RuntimeException("Failed to load class: " + className, e);
}
}

private void handleProcessingError(String mopMessageSubclass, MessageProcessor processor, Object message, Exception e) {
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass, e);
try {
processor.onFailure(e, processor.castToMessageClass(message));
} catch (Exception e1) {
log.error("Error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e1);
}
} else {
log.error("Unsupported message and/or processor encountered. Skipping processing", e);
}
}

} finally {
private void acknowledgeMessage(InboundMessage inboundMessage) {
synchronized (persistentMessageReceiver) {
persistentMessageReceiver.ack(inboundMessage);
}
}
Expand All @@ -103,6 +147,7 @@ private Queue determineQueue() {
return Queue.durableNonExclusiveQueue(eventPortalProperties.getIncomingRequestQueueName());
}


@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Queue queue = determineQueue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.solace.maas.ep.event.management.agent.subscriber;

import com.solace.messaging.receiver.InboundMessage;

/**
* The SolacePersistentMessageHandlerObserver interface defines methods to observe
* the lifecycle of messages handled by a Solace message handler. Implementers can
* use this interface to react to various stages of message processing.
* Primary use case is for testing purposes.
*/
public interface SolacePersistentMessageHandlerObserver {

void onPhaseChange(InboundMessage message, PersistentMessageHandlerObserverPhase phase);

}
Loading

0 comments on commit 5ad78c3

Please sign in to comment.