Skip to content

Commit

Permalink
DATAGO-79772 wip
Browse files Browse the repository at this point in the history
  • Loading branch information
rudraneel-chakraborty committed Jul 4, 2024
1 parent 91bf6c3 commit 8a41b5a
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
Expand All @@ -45,14 +48,14 @@ public class CommandManager {
private final MessagingServiceDelegateService messagingServiceDelegateService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor configPushPool;
private final CommandLogStreamingProcessor commandLogStreamingProcessor;
private final Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt;

public CommandManager(TerraformManager terraformManager,
CommandMapper commandMapper,
CommandPublisher commandPublisher,
MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties,
CommandLogStreamingProcessor commandLogStreamingProcessor) {
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt) {
this.terraformManager = terraformManager;
this.commandMapper = commandMapper;
this.commandPublisher = commandPublisher;
Expand All @@ -65,7 +68,7 @@ public CommandManager(TerraformManager terraformManager,
configPushPool.setThreadNamePrefix("config-push-pool-");
configPushPool.setTaskDecorator(new MdcTaskDecorator());
configPushPool.initialize();
this.commandLogStreamingProcessor = commandLogStreamingProcessor;
this.commandLogStreamingProcessorOpt = commandLogStreamingProcessorOpt;
}

public void execute(CommandMessage request) {
Expand Down Expand Up @@ -106,7 +109,10 @@ public void configPush(CommandRequest request) {
for (Command command : bundle.getCommands()) {
Path executionLog = executeCommand(request, command, envVars);
if (executionLog != null) {
streamCommandExecutionLogToEpCore(request, command, executionLog);
if (commandLogStreamingProcessorOpt.isPresent()) {
streamCommandExecutionLogToEpCore(request, command, executionLog);
}

executionLogFilesToClean.add(executionLog);
}
if (exitEarlyOnFailedCommand(bundle, command)) {
Expand Down Expand Up @@ -153,17 +159,41 @@ private Path executeCommand(CommandRequest request,

private void cleanup(List<Path> listOfExecutionLogFiles) {
try {
commandLogStreamingProcessor.deleteExecutionLogFiles(listOfExecutionLogFiles);
deleteExecutionLogFiles(listOfExecutionLogFiles);
} catch (Exception e) {
log.error("Error while deleting execution log.", e);
}
}

private void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) {
public void deleteExecutionLogFiles(List<Path> listOfExecutionLogFiles) {
boolean allFilesDeleted = listOfExecutionLogFiles
.stream()
.allMatch(this::deleteExecutionLogFile);
if (!allFilesDeleted) {
throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs");
}
}

private boolean deleteExecutionLogFile(Path path) {
try {
if (Files.exists(path)) {
Files.delete(path);
}
} catch (IOException e) {
log.warn("Error while deleting execution log at {}", path, e);
return false;
}
return true;
}

public void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) {
if (commandLogStreamingProcessorOpt.isEmpty()) {
throw new UnsupportedOperationException("Streaming logs to ep is not supported for this event management agent type");
}
try {
commandLogStreamingProcessor.streamLogsToEP(request, command, executionLog);
commandLogStreamingProcessorOpt.get().streamLogsToEP(request, command, executionLog);
} catch (Exception e) {
log.error("Error sending logs to ep-core for command with commandCorrelationId",
log.error("Error sending logs to ep-core for command with commandCorrelationId {}",
request.getCommandCorrelationId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
Expand All @@ -33,7 +32,7 @@

@Component
@Slf4j
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
@ConditionalOnExpression("${event-portal.gateway.messaging.standalone:false}== false && ${event-portal.managed:false} == false")
public class CommandLogStreamingProcessor {

public static final String ANY = "*";
Expand All @@ -50,28 +49,6 @@ public CommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher,
}


public void deleteExecutionLogFiles(List<Path> listOfExecutionLogFiles) {
boolean allFilesDeleted = listOfExecutionLogFiles
.stream()
.allMatch(this::deleteExecutionLogFile);
if (!allFilesDeleted) {
throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs");
}
}

private boolean deleteExecutionLogFile(Path path) {
try {
if (Files.exists(path)) {
Files.delete(path);
}
} catch (IOException e) {
log.warn("Error while deleting execution log at {}", path, e);
return false;
}
return true;
}


public void streamLogsToEP(CommandRequest request, Command executedCommand, Path commandExecutionLog) {

if (executedCommand.getIgnoreResult()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.solace.maas.ep.event.management.agent;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.command.mapper.CommandMapper;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
Expand Down Expand Up @@ -31,13 +30,13 @@
import org.apache.camel.ProducerTemplate;
import org.apache.camel.support.DefaultExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;

import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -161,29 +160,14 @@ public CommandLogsPublisher getComaCommandLogsPublisher() {
return mock(CommandLogsPublisher.class);
}

@Bean
@Qualifier("realCommandLogStreamingProcessor")
public CommandLogStreamingProcessor realCommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher,
EventPortalProperties eventPortalProperties,
ObjectMapper objectMapper) {
return new CommandLogStreamingProcessor(commandLogsPublisher, eventPortalProperties, objectMapper);
}

@Bean
@Qualifier("mockedCommandLogStreamingProcessor")
@Primary
public CommandLogStreamingProcessor mockedCommandLogStreamingProcessor() {
return mock(CommandLogStreamingProcessor.class);
}

@Bean
@Primary
public CommandManager getCommandManager(TerraformManager terraformManager,
CommandMapper commandMapper,
CommandPublisher commandPublisher,
MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties,
CommandLogStreamingProcessor commandLogStreamingProcessor) {
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessor) {
return new CommandManager(
terraformManager,
commandMapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ void testDirectMessageHandlerBeansAreNotLoaded() {
);

}

@Test
void testCommandLogStreamingProcessorBeanIsNotLoaded() {
String[] allBeanNames = applicationContext.getBeanDefinitionNames();
assertThat(
Arrays.stream(allBeanNames)
.map(StringUtils::lowerCase)
.collect(Collectors.toSet()))
.doesNotContain(StringUtils.lowerCase("commandLogStreamingProcessor"));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,14 @@ void testDirectMessageHandlerBeansAreLoaded() {
);

}

@Test
void testCommandLogStreamingProcessorBeanIsLoaded() {
String[] allBeanNames = applicationContext.getBeanDefinitionNames();
assertThat(
Arrays.stream(allBeanNames)
.map(StringUtils::lowerCase)
.collect(Collectors.toSet())).contains(StringUtils.lowerCase("commandLogStreamingProcessor"));

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.solace.maas.ep.event.management.agent.commandManager;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;

import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

@Slf4j
@ActiveProfiles("TEST")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
"eventPortal.gateway.messaging.standalone=false",
"eventPortal.managed=true",
"eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123",
"event-portal.gateway.messaging.rto-session=false"
})
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
//CPD-OFF
class CloudManagedEMACommandManagerTests {

@SpyBean
private CommandManager commandManager;

@Autowired
private TerraformManager terraformManager;

@Autowired
private CommandPublisher commandPublisher;

@Autowired
private MessagingServiceDelegateService messagingServiceDelegateService;

@Autowired
private EventPortalProperties eventPortalProperties;

private static final String MESSAGING_SERVICE_ID = "myMessagingServiceId";

private ArgumentCaptor<List<Path>> executionLogFileCaptor;
private ArgumentCaptor<Map<String, String>> topicArgCaptor;
private ArgumentCaptor<Map<String, String>> envArgCaptor;
private ArgumentCaptor<CommandMessage> responseCaptor;

private CommandMessage message;

@BeforeEach
void setUp() {
message = CommandManagerTestHelper.buildCommandMessageForConfigPush(eventPortalProperties.getOrganizationId(), MESSAGING_SERVICE_ID);
executionLogFileCaptor = ArgumentCaptor.forClass(List.class);
topicArgCaptor = ArgumentCaptor.forClass(Map.class);
doNothing().when(commandPublisher).sendCommandResponse(any(), any());
when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));
envArgCaptor = ArgumentCaptor.forClass(Map.class);
responseCaptor = ArgumentCaptor.forClass(CommandMessage.class);
}

@Test
void noLogsStreamingToEP(@TempDir Path basePath) {
doAnswer((Answer<Path>) invocation -> {
Command command = (Command) invocation.getArgument(1);
return CommandManagerTestHelper.setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath);
}).when(terraformManager).execute(any(), any(), any());

commandManager.execute(message);

// Wait for the command thread to complete
await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1));

verify(terraformManager, times(4)).execute(any(), any(), envArgCaptor.capture());
verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture());

//Logs will not be streamed as EMA is cloud managed
verify(commandManager, times(0)).streamCommandExecutionLogToEpCore(any(), any(), any());

//Logs will be cleaned up anyway
verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture());
Assertions.assertThat(executionLogFileCaptor.getValue())
.containsExactlyInAnyOrder(
basePath.resolve("apply"),
basePath.resolve("write_HCL"),
basePath.resolve("write_HCL"),
basePath.resolve("sync"));
}
}
Loading

0 comments on commit 8a41b5a

Please sign in to comment.