Skip to content

Commit

Permalink
DATAGO-77217 EMA dynamic configuration for scan jobs (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
mynecker authored Jun 28, 2024
1 parent 8183c18 commit 5ac5534
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ hs_err_pid*
**/application-DEV.yml
application.properties
application.yml
**/application-*-local.yml


.idea
.DS_Store
Expand All @@ -46,3 +48,4 @@ application-mysql-*.yml
.env

*dependency-reduced-pom.xml

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.solace.maas.ep.common.messages;

import com.solace.maas.ep.common.model.CommandMessageWithResources;
import com.solace.maas.ep.common.model.EventBrokerResourceConfiguration;
import com.solace.maas.ep.common.model.ScanDestination;
import com.solace.maas.ep.common.model.ScanType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
Expand All @@ -11,12 +13,13 @@
import java.util.List;

@Data
public class ScanCommandMessage extends MOPMessage {
public class ScanCommandMessage extends MOPMessage implements CommandMessageWithResources {

private String messagingServiceId;
private String scanId;
private List<ScanType> scanTypes;
private List<ScanDestination> destinations;
private List<EventBrokerResourceConfiguration> resources;

public ScanCommandMessage() {
super();
Expand All @@ -25,7 +28,8 @@ public ScanCommandMessage() {
public ScanCommandMessage(String messagingServiceId,
String scanId,
List<ScanType> scanTypes,
List<ScanDestination> destinations) {
List<ScanDestination> destinations,
List<EventBrokerResourceConfiguration> resources) {
super();
withMessageType(MOPMessageType.generic)
.withProtocol(MOPProtocol.scanDataControl)
Expand All @@ -35,8 +39,18 @@ public ScanCommandMessage(String messagingServiceId,
this.scanId = scanId;
this.scanTypes = scanTypes;
this.destinations = destinations;
this.resources = resources;
}

public ScanCommandMessage(String messagingServiceId,
String scanId,
List<ScanType> scanTypes,
List<ScanDestination> destinations) {
this(messagingServiceId, scanId, scanTypes, destinations, null);
}



@Override
public String toLog() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.solace.maas.ep.event.management.agent.scanManager;

import com.solace.maas.ep.common.messages.ScanCommandMessage;
import com.solace.maas.ep.common.messages.ScanStatusMessage;
import com.solace.maas.ep.common.model.ScanType;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants;
import com.solace.maas.ep.event.management.agent.plugin.constants.ScanStatus;
import com.solace.maas.ep.event.management.agent.plugin.manager.loader.PluginLoader;
import com.solace.maas.ep.event.management.agent.plugin.route.RouteBundle;
import com.solace.maas.ep.event.management.agent.plugin.route.handler.base.MessagingServiceRouteDelegate;
import com.solace.maas.ep.event.management.agent.publisher.ScanStatusPublisher;
import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
Expand All @@ -25,20 +30,27 @@
import java.util.UUID;
import java.util.stream.Collectors;


@Slf4j
@Service
public class ScanManager {

private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;
private final ScanService scanService;
private final String runtimeAgentId;
private final String orgId;
private final ScanStatusPublisher scanStatusPublisher;

@Autowired
public ScanManager(MessagingServiceDelegateServiceImpl messagingServiceDelegateService,
ScanService scanService, EventPortalProperties eventPortalProperties) {
ScanService scanService,
EventPortalProperties eventPortalProperties,
ScanStatusPublisher scanStatusPublisher) {
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.scanService = scanService;
this.scanStatusPublisher = scanStatusPublisher;
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
orgId = eventPortalProperties.getOrganizationId();
}

public String scan(ScanRequestBO scanRequestBO) {
Expand Down Expand Up @@ -102,6 +114,27 @@ public String scan(ScanRequestBO scanRequestBO) {
return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}

public void handleError(Exception e, ScanCommandMessage message){

List<String> scanTypeNames = message.getScanTypes().stream().map(ScanType::name).toList();

ScanStatusMessage response = new ScanStatusMessage(
message.getOrgId(),
message.getScanId(),
MDC.get(RouteConstants.TRACE_ID),
MDC.get(RouteConstants.ACTOR_ID),
ScanStatus.FAILED.name(),
"Scan failed",
scanTypeNames
);

Map<String, String> topicVars = Map.of(
"orgId", orgId,
"runtimeAgentId", runtimeAgentId
);
scanStatusPublisher.sendOverallScanStatus(response,topicVars);
}

private MessagingServiceEntity retrieveMessagingServiceEntity(String messagingServiceId) {
return messagingServiceDelegateService.getMessagingServiceById(messagingServiceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
import lombok.extern.slf4j.Slf4j;
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand All @@ -20,9 +21,12 @@ public class ScanCommandMessageProcessor implements MessageProcessor<ScanCommand

private static final String DEFAULT_DESTINATION = "FILE_WRITER";
private final ScanManager scanManager;
private final DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper;

public ScanCommandMessageProcessor(ScanManager scanManager) {
public ScanCommandMessageProcessor(ScanManager scanManager,
DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper) {
this.scanManager = scanManager;
this.dynamicResourceConfigurationHelper = dynamicResourceConfigurationHelper;
}

@Override
Expand All @@ -35,6 +39,10 @@ public void processMessage(ScanCommandMessage message) {
log.debug("Received scan command message: {} for event broker: {}, traceId: {}",
message, message.getMessagingServiceId(), message.getTraceId());

if (CollectionUtils.isNotEmpty(message.getResources())) {
dynamicResourceConfigurationHelper.loadSolaceBrokerResourceConfigurations(message.getResources());
}

message.getScanTypes().forEach(scanType -> entityTypes.add(scanType.name()));

if (message.getDestinations() == null) {
Expand Down Expand Up @@ -75,6 +83,6 @@ public ScanCommandMessage castToMessageClass(Object message) {

@Override
public void onFailure(Exception e, ScanCommandMessage message) {
log.debug("Requires implementation");
scanManager.handleError(e,message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.solace.maas.ep.event.management.agent.common.messages;

import com.solace.maas.ep.common.messages.ScanCommandMessage;
import com.solace.maas.ep.common.model.ResourceConfigurationType;
import com.solace.maas.ep.common.model.ScanDestination;
import com.solace.maas.ep.common.model.ScanType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPProtocol;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPUHFlag;
import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.EventBrokerResourceConfigTestHelper;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;


public class ScanCommandMessageTests {

private void assertMopMessageProperties(ScanCommandMessage scanCommandMessage) {
assertNotNull(scanCommandMessage);
assertEquals(MOPMessageType.generic, scanCommandMessage.getMopMsgType());
assertEquals(MOPProtocol.scanDataControl, scanCommandMessage.getMopProtocol());
assertEquals("1", scanCommandMessage.getMopVer());
assertEquals(MOPUHFlag.ignore, scanCommandMessage.getMsgUh());
}

@Test
void instantiateScanCommandMessage() {
ScanCommandMessage scanCommandMessage = new ScanCommandMessage(
"messagingServiceId1",
"scanId1",
List.of(ScanType.SOLACE_ALL),
List.of(ScanDestination.EVENT_PORTAL, ScanDestination.FILE_WRITER)
);
assertMopMessageProperties(scanCommandMessage);
assertEquals("messagingServiceId1", scanCommandMessage.getMessagingServiceId());
assertEquals("scanId1", scanCommandMessage.getScanId());
assertEquals(List.of(ScanType.SOLACE_ALL), scanCommandMessage.getScanTypes());
assertEquals(List.of(ScanDestination.EVENT_PORTAL, ScanDestination.FILE_WRITER), scanCommandMessage.getDestinations());
assertNull(scanCommandMessage.getResources());
}

@Test
void instantiateScanCommandMessageWithResources() {
ScanCommandMessage scanCommandMessage = new ScanCommandMessage(
"messagingServiceId1",
"scanId1",
List.of(ScanType.SOLACE_ALL),
List.of(ScanDestination.EVENT_PORTAL, ScanDestination.FILE_WRITER),
List.of(EventBrokerResourceConfigTestHelper.buildResourceConfiguration(ResourceConfigurationType.SOLACE))
);
assertMopMessageProperties(scanCommandMessage);
assertEquals("messagingServiceId1", scanCommandMessage.getMessagingServiceId());
assertEquals("scanId1", scanCommandMessage.getScanId());
assertEquals(List.of(ScanType.SOLACE_ALL), scanCommandMessage.getScanTypes());
assertEquals(List.of(ScanDestination.EVENT_PORTAL, ScanDestination.FILE_WRITER), scanCommandMessage.getDestinations());
assertNotNull(scanCommandMessage.getResources());
assertEquals(1, scanCommandMessage.getResources().size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.solace.maas.ep.event.management.agent.scanManager;

import com.solace.maas.ep.common.messages.ScanCommandMessage;
import com.solace.maas.ep.common.model.ScanDestination;
import com.solace.maas.ep.common.model.ScanType;
import com.solace.maas.ep.event.management.agent.TestConfig;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.publisher.ScanStatusPublisher;
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanService;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ActiveProfiles("TEST")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = TestConfig.class)
class ScanManagerHandleErrorTest {

@Mock
EventPortalProperties eventPortalProperties;

@Mock
MessagingServiceDelegateServiceImpl messagingServiceDelegateService;

@Mock
private ScanService scanService;

@Mock
private ScanStatusPublisher scanStatusPublisher;

@Test
void testScanManagerHandleError(){
when(eventPortalProperties.getOrganizationId()).thenReturn("orgId");
when(eventPortalProperties.getRuntimeAgentId()).thenReturn("runtimeAgentId");

RuntimeException mockEx = new RuntimeException("Mock Exception");

ScanManager scanManagerUnderTest = new ScanManager(
messagingServiceDelegateService,
scanService,
eventPortalProperties,
scanStatusPublisher
);
scanManagerUnderTest.handleError(mockEx,createScanCommandMessage());
verify(scanStatusPublisher, times(1)).sendOverallScanStatus(any(),any());
}



private ScanCommandMessage createScanCommandMessage(){
return new ScanCommandMessage(
"messageServiceId",
"scanId",
List.of(ScanType.SOLACE_ALL),
List.of(ScanDestination.EVENT_PORTAL),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ScanManagerTest {
@Mock
private ScanService scanService;


@Test
@SneakyThrows
void testScanManagerExceptions() {
Expand Down Expand Up @@ -237,4 +238,6 @@ private List<RouteBundle> getKafkaRoutes(List<RouteBundle> destinations, String
.build()
);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.solace.maas.ep.event.management.agent.subscriber.messageProcessors;

import com.solace.maas.ep.common.messages.ScanCommandMessage;
import com.solace.maas.ep.common.model.EventBrokerResourceConfiguration;
import com.solace.maas.ep.common.model.ResourceConfigurationType;
import com.solace.maas.ep.common.model.ScanDestination;
import com.solace.maas.ep.common.model.ScanType;
import com.solace.maas.ep.event.management.agent.TestConfig;
import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ActiveProfiles;

import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = TestConfig.class)
@ActiveProfiles("TEST")
class ScanCommandMessageProcessorTests {

@MockBean
private ScanManager scanManager;

@SpyBean
private ScanCommandMessageProcessor scanCommandMessageProcessor;

@MockBean
private DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper;

@Test
void processMessageWithoutResourceConfiguration(){
ScanCommandMessage message = buildScanCommandMessage(null);
scanCommandMessageProcessor.processMessage(message);
verifyNoInteractions(dynamicResourceConfigurationHelper);
verify(scanManager,times(1)).scan(any());
}

@Test
void processMessageWithResourceConfiguration(){
ScanCommandMessage message = buildScanCommandMessage(List.of(
EventBrokerResourceConfigTestHelper.buildResourceConfiguration(ResourceConfigurationType.SOLACE))
);
scanCommandMessageProcessor.processMessage(message);
verify(dynamicResourceConfigurationHelper, times(1)).loadSolaceBrokerResourceConfigurations(any());
verify(scanManager,times(1)).scan(any());
}


private ScanCommandMessage buildScanCommandMessage(List<EventBrokerResourceConfiguration> resources){
return new ScanCommandMessage(
"messageServiceId",
"scanId",
List.of(ScanType.SOLACE_ALL),
List.of(ScanDestination.EVENT_PORTAL),
resources);
}
}

0 comments on commit 5ac5534

Please sign in to comment.