Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-57805: Add command line functionality #153

Merged
merged 12 commits into from
Jan 19, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package com.solace.maas.ep.event.management.agent.cli;

import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanStatusEntity;
import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.ZipRequestBO;
import com.solace.maas.ep.event.management.agent.service.DataCollectionFileService;
import com.solace.maas.ep.event.management.agent.service.ImportService;
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanStatusService;
import com.solace.maas.ep.event.management.agent.util.IDGenerator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Component
@Order(1)
@Slf4j
public class EmaCommandLine implements CommandLineRunner {
private final ScanManager scanManager;
private final ImportService importService;
private final ScanStatusService scanStatusService;
private final IDGenerator idGenerator;
private final DataCollectionFileService dataCollectionFileService;
private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;

public EmaCommandLine(ScanManager scanManager,
ImportService importService,
ScanStatusService scanStatusService,
IDGenerator idGenerator,
DataCollectionFileService dataCollectionFileService,
MessagingServiceDelegateServiceImpl messagingServiceDelegateService) {
this.scanManager = scanManager;
this.importService = importService;
this.scanStatusService = scanStatusService;
this.idGenerator = idGenerator;
this.dataCollectionFileService = dataCollectionFileService;
this.messagingServiceDelegateService = messagingServiceDelegateService;
}

@Override
public void run(String... args) throws Exception {
if (args.length > 2) {

String type = args[0];

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, only handle the scan command.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment for next ticket, let's change this var to commandType or command or something else.

if ("scan".equals(type)) {
String messagingServiceId = args[1];
String filePathAndName = args[2];

runScan(messagingServiceId, filePathAndName);
} else {
log.error("Unknown command: {}", type);
}
} else {
log.error("Not enough arguments passed to the application.");
}
}

private void runScan(String messagingServiceId, String filePathAndName) throws InterruptedException, IOException {
MessagingServiceEntity messagingServiceEntity = messagingServiceDelegateService.getMessagingServiceById(messagingServiceId);

ScanRequestBO scanRequestBO = new ScanRequestBO();
scanRequestBO.setMessagingServiceId(messagingServiceId);
scanRequestBO.setScanId(idGenerator.generateRandomUniqueId());
gregmeldrum marked this conversation as resolved.
Show resolved Hide resolved
setScanType(messagingServiceEntity, scanRequestBO, messagingServiceId);
scanRequestBO.setDestinations(List.of("FILE_WRITER"));

log.info("Scan request [{}]: Received, request details: {}", scanRequestBO.getScanId(), scanRequestBO);
String scanId = scanManager.scan(scanRequestBO);

log.info("Scan request [{}]: Scan started.", scanId);
waitForScanToComplete(scanId);

if (isCompletedSuccessfully(scanId)) {
writeScanToZipFile(filePathAndName, scanId);
} else {
log.error("Scan request [{}]: Scan did not complete successfully.", scanId);
}
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It completed successfully if there are dataCollectionFileEntitites for each scan type, and none of them are marked as "FAILED"

private boolean isCompletedSuccessfully(String scanId) {
List<ScanStatusEntity> statuses = scanStatusService.getScanStatuses(scanId);
boolean completedSuccessfully = false;
if (dataCollectionFileService.findAllByScanId(scanId).size() >=
statuses.size() && !anyScanStatusesInDesiredState("FAILED", statuses)) {
log.info("Scan request [{}]: Scan completed successfully.", scanId);
completedSuccessfully = true;
}
return completedSuccessfully;
}

private void setScanType(MessagingServiceEntity messagingServiceEntity, ScanRequestBO scanRequestBO, String messagingServiceId) {
switch (messagingServiceEntity.getType().toLowerCase()) {
case "solace":
scanRequestBO.setScanTypes(List.of("SOLACE_ALL"));
break;
case "kafka":
messagingServiceDelegateService.getMessagingServicesRelations(messagingServiceId).stream()
.findFirst()
.ifPresentOrElse(messagingServiceEntity1 -> scanRequestBO.setScanTypes(List.of("KAFKA_ALL", "CONFLUENT_SCHEMA_REGISTRY_SCHEMA")),
() -> scanRequestBO.setScanTypes(List.of("KAFKA_ALL")));
break;
case "confluent_schema_registry":
scanRequestBO.setScanTypes(List.of("CONFLUENT_SCHEMA_REGISTRY_SCHEMA"));
break;
default:
throw new RuntimeException("Unsupported messaging service type: " + messagingServiceEntity.getType());
}
}

public void waitForScanToComplete(String scanId) throws InterruptedException {
AtomicBoolean scanCompleted = new AtomicBoolean(false);

// The hard timeout is set to 30 minutes to prevent the scan from running forever
long hardTimeoutMillis = System.currentTimeMillis() + 30 * 60 * 1000;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "paranoia" timeout just so the EMA doesn't run forever. It's currently set to 30 minutes since large configs can take a long time to scan.


// The initiated timout is set to 2 minutes to shortcut a scan that is stuck in the INITIATED state
// This typically occurs if there is an error in the broker URL or credentials
long initiatedTimeoutMillis = System.currentTimeMillis() + 2 * 60 * 1000;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is triggered if all of the scans are in "INITIATED" state for more than 2 minutes. It's meant to catch cases where we had trouble creating the client or running the command but the exception was not captured.


while (System.currentTimeMillis() < hardTimeoutMillis && !scanCompleted.get()) {
Thread.sleep(2000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this Thread.sleep increases CPU usage by a lot. Can we use Timer and TimerTask instead to run a scanComplete or hardTimeoutMillis check every 2 minutes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm guessing wrong on the cpu consumption, let's keep the code as is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything I've read online says that the thread is released and does not consume CPU for a .sleep and .wait() so out of simplicity I'd like to keep it as is.


List<ScanStatusEntity> statuses = scanStatusService.getScanStatuses(scanId);

if (scanCompleted(statuses)) {
scanCompleted.set(true);
} else if (System.currentTimeMillis() > initiatedTimeoutMillis && scanInitiated(statuses)) {
throw new RuntimeException("Scan is stuck in INITIATED state. Check broker URL and credentials.");
} else {
log.debug("Waiting for scan to complete...");
}
}
}

private void writeScanToZipFile(String filePathAndName, String scanId) throws IOException {
// Use the import service to receive the scan stream and write
// it to a zip file
ZipRequestBO zipRequestBO = new ZipRequestBO();
zipRequestBO.setScanId(scanId);
log.info("Received zip request for scan id: {}", zipRequestBO.getScanId());
try (InputStream fileStream = importService.zip(zipRequestBO)) {

File file = new File(filePathAndName);
writeInputStreamToFile(fileStream, file);
}
}

private static void writeInputStreamToFile(InputStream inputStream, File file) {
try {
Files.copy(inputStream, file.toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private boolean scanCompleted(List<ScanStatusEntity> statuses) {
return scanSucceeded(statuses) || scanFailed(statuses);
}

private boolean scanSucceeded(List<ScanStatusEntity> statuses) {
return allScanStatusesInDesiredState("COMPLETE", statuses);
}

private boolean scanFailed(List<ScanStatusEntity> statuses) {
return anyScanStatusesInDesiredState("FAILED", statuses);
}

private boolean scanInitiated(List<ScanStatusEntity> statuses) {
return allScanStatusesInDesiredState("INITIATED", statuses);
}

private boolean allScanStatusesInDesiredState(String desiredState, List<ScanStatusEntity> statuses) {
// Check that all the scan types have the desired state
AtomicBoolean allScanTypesDesired = new AtomicBoolean(true);
statuses.forEach(scanStatusEntity -> {
if (!desiredState.equals(scanStatusEntity.getStatus())) {
allScanTypesDesired.set(false);
}
});

return allScanTypesDesired.get();
}

private boolean anyScanStatusesInDesiredState(String desiredState, List<ScanStatusEntity> statuses) {
// Check that all the scan types have the desired state
AtomicBoolean anyScanTypesDesired = new AtomicBoolean(false);
statuses.forEach(scanStatusEntity -> {
if (desiredState.equals(scanStatusEntity.getStatus())) {
anyScanTypesDesired.set(true);
}
});

return anyScanTypesDesired.get();
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;

import java.util.List;
import java.util.Objects;
Expand All @@ -24,11 +26,13 @@
@ConfigurationProperties(prefix = "plugins")
@Slf4j
@Profile("!TEST")
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ResourceConfig implements ApplicationRunner {
private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;
private List<MessagingServicePluginProperties> resources;
private final MessagingServicePluginPropertyToEventConverter configToEventConverter;


@Override
public void run(ApplicationArguments args) {
if (Objects.nonNull(resources)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class RouteCompleteProcessorImpl extends RouteCompleteProcessor {
Expand All @@ -23,18 +21,8 @@ public RouteCompleteProcessorImpl(ScanStatusService scanStatusService) {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(RouteConstants.SCAN_STATUS, ScanStatus.COMPLETE);

String scanId = (String) exchange.getIn().getHeader(RouteConstants.SCAN_ID);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this code into a shared abstract class called RouteStateProcessor that's used by RouteCompleteProcessor and RouteFailedProcessor.

String scanType;

if (exchange.getIn().getHeader(RouteConstants.SCAN_TYPE) instanceof List<?>) {
scanType = (String) exchange.getIn().getBody();
exchange.getIn().setHeader(RouteConstants.SCAN_TYPE, scanType);
} else {
scanType = (String) exchange.getIn().getHeader(RouteConstants.SCAN_TYPE);
}

scanStatusService.save(scanType, scanId);
String scanType = getScanType(exchange);
scanStatusService.save(scanType, scanId, ScanStatus.COMPLETE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.solace.maas.ep.event.management.agent.processor;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New processor to set the scan status for a scan type. Before it always stayed Initiating in the DB. Now we set it to failed so the command line runner knows that the command failed.


import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants;
import com.solace.maas.ep.event.management.agent.plugin.constants.ScanStatus;
import com.solace.maas.ep.event.management.agent.plugin.processor.RouteFailedProcessor;
import com.solace.maas.ep.event.management.agent.service.ScanStatusService;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RouteFailedProcessorImpl extends RouteFailedProcessor {
private final ScanStatusService scanStatusService;

public RouteFailedProcessorImpl(ScanStatusService scanStatusService) {
super();
this.scanStatusService = scanStatusService;
}

@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(RouteConstants.SCAN_STATUS, ScanStatus.COMPLETE);

String scanId = (String) exchange.getIn().getHeader(RouteConstants.SCAN_ID);
String scanType = getScanType(exchange);
scanStatusService.save(scanType, scanId, ScanStatus.FAILED);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.solace.maas.ep.event.management.agent.route.ep;

import com.solace.maas.ep.event.management.agent.plugin.processor.ScanTypeDescendentsProcessor;
import com.solace.maas.ep.event.management.agent.plugin.route.handler.base.AbstractRouteBuilder;
import com.solace.maas.ep.event.management.agent.processor.RouteFailedProcessorImpl;
import org.springframework.stereotype.Component;

@Component
public class ScanStatusFailedPersistenceRouteBuilder extends AbstractRouteBuilder {
private final RouteFailedProcessorImpl routeFailedProcessor;

public ScanStatusFailedPersistenceRouteBuilder(RouteFailedProcessorImpl routeFailedProcessor,
ScanTypeDescendentsProcessor scanTypeDescendentsProcessor) {
super(scanTypeDescendentsProcessor);
this.routeFailedProcessor = routeFailedProcessor;
}

@Override
public void configure() throws Exception {
super.configure();

from("direct:processScanStatusAsFailed")
.process(routeFailedProcessor);
}
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,60 @@
package com.solace.maas.ep.event.management.agent.service;

import com.solace.maas.ep.event.management.agent.plugin.constants.ScanStatus;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanEntity;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanStatusEntity;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanTypeEntity;
import com.solace.maas.ep.event.management.agent.repository.scan.ScanStatusRepository;
import com.solace.maas.ep.event.management.agent.util.IDGenerator;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Objects;

@Service
public class ScanStatusService {
private final ScanStatusRepository repository;

private final ScanTypeService scanTypeService;
private final ScanService scanService;

private final IDGenerator idGenerator;

public ScanStatusService(ScanStatusRepository repository, ScanTypeService scanTypeService, IDGenerator idGenerator) {
public ScanStatusService(ScanStatusRepository repository, ScanTypeService scanTypeService, ScanService scanService, IDGenerator idGenerator) {
this.repository = repository;
this.scanTypeService = scanTypeService;
this.scanService = scanService;
this.idGenerator = idGenerator;
}

@Transactional
public ScanStatusEntity save(String name, String scanId) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the ability to set any state instead of hardcoding to COMPLETE.

public ScanStatusEntity save(String name, String scanId, ScanStatus scanStatus) {
ScanTypeEntity scanType = scanTypeService.findByNameAndScanId(name, scanId)
.orElseThrow(() -> new RuntimeException("Can't apply Scan Status to Scan that doesn't exist!"));

ScanStatusEntity scanStatusEntity = scanType.getStatus();

if (Objects.nonNull(scanStatusEntity)) {
scanStatusEntity.setStatus(ScanStatus.COMPLETE.name());
scanStatusEntity.setStatus(scanStatus.name());
} else {
scanStatusEntity = ScanStatusEntity.builder()
.id(idGenerator.generateRandomUniqueId())
.scanType(scanType)
.status(ScanStatus.COMPLETE.name())
.status(scanStatus.name())
.build();
}

return repository.save(scanStatusEntity);
}

@Transactional
public List<ScanStatusEntity> getScanStatuses(String scanId) {
ScanEntity scanEntity = scanService.findById(scanId)
.orElseThrow(() -> new RuntimeException("ScanId " + scanId + " not found!"));
List<ScanStatusEntity> scanStatuses = scanEntity.getScanTypes().stream()
.map(ScanTypeEntity::getStatus)
.toList();
return scanStatuses;
}
}
Loading
Loading