From 8a41b5a507e42909fef4193a5f93871a6b4fe9fc Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Thu, 4 Jul 2024 16:42:27 -0400 Subject: [PATCH] DATAGO-79772 wip --- .../agent/command/CommandManager.java | 46 +++++- .../CommandLogStreamingProcessor.java | 27 +-- .../ep/event/management/agent/TestConfig.java | 20 +-- .../ManagedAgentMessageHandlerBeansTests.java | 11 ++ ...fManagedAgentMessageHandlerBeansTests.java | 10 ++ .../CloudManagedEMACommandManagerTests.java | 119 ++++++++++++++ .../CommandManagerTestHelper.java | 97 +++++++++++ .../commandManager/CommandManagerTests.java | 154 +++++++++--------- .../CommandLogStreamProcessorTest.java | 60 ------- .../terraform/manager/TerraformManager.java | 8 +- 10 files changed, 365 insertions(+), 187 deletions(-) create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java index e312728c..6aef42e3 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java @@ -21,12 +21,15 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; @@ -45,14 +48,14 @@ public class CommandManager { private final MessagingServiceDelegateService messagingServiceDelegateService; private final EventPortalProperties eventPortalProperties; private final ThreadPoolTaskExecutor configPushPool; - private final CommandLogStreamingProcessor commandLogStreamingProcessor; + private final Optional commandLogStreamingProcessorOpt; public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper, CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService, EventPortalProperties eventPortalProperties, - CommandLogStreamingProcessor commandLogStreamingProcessor) { + Optional commandLogStreamingProcessorOpt) { this.terraformManager = terraformManager; this.commandMapper = commandMapper; this.commandPublisher = commandPublisher; @@ -65,7 +68,7 @@ public CommandManager(TerraformManager terraformManager, configPushPool.setThreadNamePrefix("config-push-pool-"); configPushPool.setTaskDecorator(new MdcTaskDecorator()); configPushPool.initialize(); - this.commandLogStreamingProcessor = commandLogStreamingProcessor; + this.commandLogStreamingProcessorOpt = commandLogStreamingProcessorOpt; } public void execute(CommandMessage request) { @@ -106,7 +109,10 @@ public void configPush(CommandRequest request) { for (Command command : bundle.getCommands()) { Path executionLog = executeCommand(request, command, envVars); if (executionLog != null) { - streamCommandExecutionLogToEpCore(request, command, executionLog); + if (commandLogStreamingProcessorOpt.isPresent()) { + streamCommandExecutionLogToEpCore(request, command, executionLog); + } + executionLogFilesToClean.add(executionLog); } if (exitEarlyOnFailedCommand(bundle, command)) { @@ -153,17 +159,41 @@ private Path executeCommand(CommandRequest request, private void cleanup(List listOfExecutionLogFiles) { try { - commandLogStreamingProcessor.deleteExecutionLogFiles(listOfExecutionLogFiles); + deleteExecutionLogFiles(listOfExecutionLogFiles); } catch (Exception e) { log.error("Error while deleting execution log.", e); } } - private void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) { + public void deleteExecutionLogFiles(List listOfExecutionLogFiles) { + boolean allFilesDeleted = listOfExecutionLogFiles + .stream() + .allMatch(this::deleteExecutionLogFile); + if (!allFilesDeleted) { + throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs"); + } + } + + private boolean deleteExecutionLogFile(Path path) { + try { + if (Files.exists(path)) { + Files.delete(path); + } + } catch (IOException e) { + log.warn("Error while deleting execution log at {}", path, e); + return false; + } + return true; + } + + public void streamCommandExecutionLogToEpCore(CommandRequest request, Command command, Path executionLog) { + if (commandLogStreamingProcessorOpt.isEmpty()) { + throw new UnsupportedOperationException("Streaming logs to ep is not supported for this event management agent type"); + } try { - commandLogStreamingProcessor.streamLogsToEP(request, command, executionLog); + commandLogStreamingProcessorOpt.get().streamLogsToEP(request, command, executionLog); } catch (Exception e) { - log.error("Error sending logs to ep-core for command with commandCorrelationId", + log.error("Error sending logs to ep-core for command with commandCorrelationId {}", request.getCommandCorrelationId(), e); } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java index f08e161e..54462dbb 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamingProcessor.java @@ -12,10 +12,9 @@ import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -33,7 +32,7 @@ @Component @Slf4j -@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") +@ConditionalOnExpression("${event-portal.gateway.messaging.standalone:false}== false && ${event-portal.managed:false} == false") public class CommandLogStreamingProcessor { public static final String ANY = "*"; @@ -50,28 +49,6 @@ public CommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher, } - public void deleteExecutionLogFiles(List listOfExecutionLogFiles) { - boolean allFilesDeleted = listOfExecutionLogFiles - .stream() - .allMatch(this::deleteExecutionLogFile); - if (!allFilesDeleted) { - throw new IllegalArgumentException("Some of the execution log files were not deleted. Please check the logs"); - } - } - - private boolean deleteExecutionLogFile(Path path) { - try { - if (Files.exists(path)) { - Files.delete(path); - } - } catch (IOException e) { - log.warn("Error while deleting execution log at {}", path, e); - return false; - } - return true; - } - - public void streamLogsToEP(CommandRequest request, Command executedCommand, Path commandExecutionLog) { if (executedCommand.getIgnoreResult()) { diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java index b248c5e2..c9821678 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/TestConfig.java @@ -1,6 +1,5 @@ package com.solace.maas.ep.event.management.agent; -import com.fasterxml.jackson.databind.ObjectMapper; import com.solace.maas.ep.event.management.agent.command.CommandManager; import com.solace.maas.ep.event.management.agent.command.mapper.CommandMapper; import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration; @@ -31,13 +30,13 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.support.DefaultExchange; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Profile; +import java.util.Optional; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -161,21 +160,6 @@ public CommandLogsPublisher getComaCommandLogsPublisher() { return mock(CommandLogsPublisher.class); } - @Bean - @Qualifier("realCommandLogStreamingProcessor") - public CommandLogStreamingProcessor realCommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher, - EventPortalProperties eventPortalProperties, - ObjectMapper objectMapper) { - return new CommandLogStreamingProcessor(commandLogsPublisher, eventPortalProperties, objectMapper); - } - - @Bean - @Qualifier("mockedCommandLogStreamingProcessor") - @Primary - public CommandLogStreamingProcessor mockedCommandLogStreamingProcessor() { - return mock(CommandLogStreamingProcessor.class); - } - @Bean @Primary public CommandManager getCommandManager(TerraformManager terraformManager, @@ -183,7 +167,7 @@ public CommandManager getCommandManager(TerraformManager terraformManager, CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService, EventPortalProperties eventPortalProperties, - CommandLogStreamingProcessor commandLogStreamingProcessor) { + Optional commandLogStreamingProcessor) { return new CommandManager( terraformManager, commandMapper, diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java index c8a06f24..ceb3a9df 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/ManagedAgentMessageHandlerBeansTests.java @@ -59,4 +59,15 @@ void testDirectMessageHandlerBeansAreNotLoaded() { ); } + + @Test + void testCommandLogStreamingProcessorBeanIsNotLoaded() { + String[] allBeanNames = applicationContext.getBeanDefinitionNames(); + assertThat( + Arrays.stream(allBeanNames) + .map(StringUtils::lowerCase) + .collect(Collectors.toSet())) + .doesNotContain(StringUtils.lowerCase("commandLogStreamingProcessor")); + + } } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java index 9b6751e3..3d11ea37 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/applicationContextTests/SelfManagedAgentMessageHandlerBeansTests.java @@ -55,4 +55,14 @@ void testDirectMessageHandlerBeansAreLoaded() { ); } + + @Test + void testCommandLogStreamingProcessorBeanIsLoaded() { + String[] allBeanNames = applicationContext.getBeanDefinitionNames(); + assertThat( + Arrays.stream(allBeanNames) + .map(StringUtils::lowerCase) + .collect(Collectors.toSet())).contains(StringUtils.lowerCase("commandLogStreamingProcessor")); + + } } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java new file mode 100644 index 00000000..11af0ed7 --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CloudManagedEMACommandManagerTests.java @@ -0,0 +1,119 @@ +package com.solace.maas.ep.event.management.agent.commandManager; + +import com.solace.maas.ep.common.messages.CommandMessage; +import com.solace.maas.ep.event.management.agent.command.CommandManager; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; +import com.solace.maas.ep.event.management.agent.plugin.command.model.Command; +import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; +import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService; +import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient; +import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp; +import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager; +import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD; + +@Slf4j +@ActiveProfiles("TEST") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "eventPortal.gateway.messaging.standalone=false", + "eventPortal.managed=true", + "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123", + "event-portal.gateway.messaging.rto-session=false" +}) +@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD) +//CPD-OFF +class CloudManagedEMACommandManagerTests { + + @SpyBean + private CommandManager commandManager; + + @Autowired + private TerraformManager terraformManager; + + @Autowired + private CommandPublisher commandPublisher; + + @Autowired + private MessagingServiceDelegateService messagingServiceDelegateService; + + @Autowired + private EventPortalProperties eventPortalProperties; + + private static final String MESSAGING_SERVICE_ID = "myMessagingServiceId"; + + private ArgumentCaptor> executionLogFileCaptor; + private ArgumentCaptor> topicArgCaptor; + private ArgumentCaptor> envArgCaptor; + private ArgumentCaptor responseCaptor; + + private CommandMessage message; + + @BeforeEach + void setUp() { + message = CommandManagerTestHelper.buildCommandMessageForConfigPush(eventPortalProperties.getOrganizationId(), MESSAGING_SERVICE_ID); + executionLogFileCaptor = ArgumentCaptor.forClass(List.class); + topicArgCaptor = ArgumentCaptor.forClass(Map.class); + doNothing().when(commandPublisher).sendCommandResponse(any(), any()); + when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn( + new SolaceHttpSemp(SempClient.builder() + .username("myUsername") + .password("myPassword") + .connectionUrl("myConnectionUrl") + .build())); + envArgCaptor = ArgumentCaptor.forClass(Map.class); + responseCaptor = ArgumentCaptor.forClass(CommandMessage.class); + } + + @Test + void noLogsStreamingToEP(@TempDir Path basePath) { + doAnswer((Answer) invocation -> { + Command command = (Command) invocation.getArgument(1); + return CommandManagerTestHelper.setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath); + }).when(terraformManager).execute(any(), any(), any()); + + commandManager.execute(message); + + // Wait for the command thread to complete + await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); + + verify(terraformManager, times(4)).execute(any(), any(), envArgCaptor.capture()); + verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture()); + + //Logs will not be streamed as EMA is cloud managed + verify(commandManager, times(0)).streamCommandExecutionLogToEpCore(any(), any(), any()); + + //Logs will be cleaned up anyway + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + Assertions.assertThat(executionLogFileCaptor.getValue()) + .containsExactlyInAnyOrder( + basePath.resolve("apply"), + basePath.resolve("write_HCL"), + basePath.resolve("write_HCL"), + basePath.resolve("sync")); + } +} \ No newline at end of file diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java new file mode 100644 index 00000000..a850cf57 --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTestHelper.java @@ -0,0 +1,97 @@ +package com.solace.maas.ep.event.management.agent.commandManager; + +import com.solace.maas.ep.common.messages.CommandMessage; +import com.solace.maas.ep.event.management.agent.plugin.command.model.Command; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandResult; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandType; +import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType; +import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; +import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType; +import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import org.mockito.Mockito; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; + +public final class CommandManagerTestHelper { + + private CommandManagerTestHelper() { + throw new UnsupportedOperationException(); + } + + public static Boolean verifyCommandPublisherIsInvoked(CommandPublisher commandPublisher, int numberOfExpectedInvocations) { + return Mockito.mockingDetails(commandPublisher).getInvocations().size() == numberOfExpectedInvocations; + } + + + public static Path setCommandStatusAndReturnExecutionLog(Command targetCommand, + JobStatus targetStatus, + boolean ignoreResult, + Path basePath) { + + if (targetStatus == JobStatus.success) { + targetCommand.setResult(CommandResult.builder() + .status(JobStatus.success) + .result(Map.of()).build()); + return basePath.resolve(targetCommand.getCommand()); + } else { + //simulating a failed command + targetCommand.setResult(null); + targetCommand.setIgnoreResult(ignoreResult); + return null; + } + + } + + public static CommandMessage buildCommandMessageForConfigPush(String targetOrgId, + String targetMessagingServiceId) { + + List commands = List.of( + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(false) + .body("asdfasdfadsf") + .command("write_HCL") + .build(), + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(false) + .body("asdfasdfadsf") + .command("write_HCL") + .build(), + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(true) + .body("asdfasdfadsf") + .command("sync") + .build(), + + Command.builder() + .commandType(CommandType.terraform) + .ignoreResult(false) + .body("asdfasdfadsf") + .command("apply") + .build()); + + CommandMessage message = new CommandMessage(); + message.setOrigType(MOPSvcType.maasEventMgmt); + message.withMessageType(generic); + message.setContext("abc"); + message.setServiceId(targetMessagingServiceId); + message.setActorId("myActorId"); + message.setOrgId(targetOrgId); + message.setTraceId("myTraceId"); + message.setCommandCorrelationId("myCorrelationIdabc"); + message.setCommandBundles(List.of( + CommandBundle.builder() + .executionType(ExecutionType.serial) + .exitOnFailure(true) + .commands(commands) + .build())); + return message; + } +} diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java index c838f200..8a49b027 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java @@ -26,11 +26,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.SpyBean; @@ -38,6 +36,8 @@ import org.springframework.test.context.ActiveProfiles; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.LinkOption; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import java.util.stream.Stream; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID; @@ -81,9 +82,8 @@ class CommandManagerTests { @Autowired private CommandPublisher commandPublisher; - @Autowired - @Qualifier("mockedCommandLogStreamingProcessor") - private CommandLogStreamingProcessor mockedCommandLogStreamingProcessor; + @SpyBean + private CommandLogStreamingProcessor commandLogStreamingProcessor; @Autowired private MessagingServiceDelegateService messagingServiceDelegateService; @@ -100,7 +100,7 @@ public void cleanup() { reset(commandPublisher); reset(commandManager); reset(messagingServiceDelegateService); - reset(mockedCommandLogStreamingProcessor); + reset(commandLogStreamingProcessor); } @Test @@ -139,7 +139,7 @@ void testMultiThreadedCommandManager() throws InterruptedException { CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool)); // Wait for all the threads to complete (add a timeout just in case) - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(commandThreadPoolQueueSize)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -175,7 +175,7 @@ void failSendingResponseBackToEp() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(2)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(2)).sendCommandResponse(messageArgCaptor.capture(), any()); @@ -201,7 +201,7 @@ void failSettingBrokerSpecificEnvironmentVariables() { .when(messagingServiceDelegateService).getMessagingServiceClient(MESSAGING_SERVICE_ID); commandManager.execute(message); - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(1)).sendCommandResponse(messageArgCaptor.capture(), any()); @@ -218,7 +218,7 @@ void failConfigPushCommand() { doThrow(new RuntimeException("Error running command.")).when(commandManager).configPush(commandMapper.map(message)); commandManager.execute(message); - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); verify(commandPublisher, times(1)).sendCommandResponse(messageArgCaptor.capture(), any()); @@ -245,7 +245,7 @@ void testCommandManager() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -356,7 +356,7 @@ void verifyMDCIsSetInCommandManagerThread() { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); assertTrue(mdcIsSet.get()); } @@ -394,58 +394,6 @@ private static Command buildCommand(boolean ignoreResult) { .build(); } - - private CommandMessage buildCommandMessageForConfigPush() { - - List commands = List.of( - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(false) - .body("asdfasdfadsf") - .command("write_HCL") - .build(), - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(false) - .body("asdfasdfadsf") - .command("write_HCL") - .build(), - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(true) - .body("asdfasdfadsf") - .command("sync") - .build(), - - Command.builder() - .commandType(CommandType.terraform) - .ignoreResult(false) - .body("asdfasdfadsf") - .command("apply") - .build()); - - CommandMessage message = new CommandMessage(); - message.setOrigType(MOPSvcType.maasEventMgmt); - message.withMessageType(generic); - message.setContext("abc"); - message.setServiceId(MESSAGING_SERVICE_ID); - message.setActorId("myActorId"); - message.setOrgId(eventPortalProperties.getOrganizationId()); - message.setTraceId("myTraceId"); - message.setCommandCorrelationId("myCorrelationIdabc"); - message.setCommandBundles(List.of( - CommandBundle.builder() - .executionType(ExecutionType.serial) - .exitOnFailure(true) - .commands(commands) - .build())); - return message; - } - - private Boolean commandPublisherIsInvoked(int numberOfExpectedInvocations) { - return Mockito.mockingDetails(commandPublisher).getInvocations().size() == numberOfExpectedInvocations; - } - private ArgumentCaptor executeCommandAndGetResponseMessage(CommandMessage message) { ArgumentCaptor mopMessageCaptor = ArgumentCaptor.forClass(MOPMessage.class); @@ -460,7 +408,7 @@ private ArgumentCaptor executeCommandAndGetResponseMessage(CommandMe commandManager.execute(message); // Wait for the command thread to complete - await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(10, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); return mopMessageCaptor; } @@ -475,7 +423,7 @@ class LogStreamingToEpTest { @BeforeEach void setUp() { - message = buildCommandMessageForConfigPush(); + message = CommandManagerTestHelper.buildCommandMessageForConfigPush(eventPortalProperties.getOrganizationId(), MESSAGING_SERVICE_ID); executionLogFileCaptor = ArgumentCaptor.forClass(List.class); topicArgCaptor = ArgumentCaptor.forClass(Map.class); doNothing().when(commandPublisher).sendCommandResponse(any(), any()); @@ -503,7 +451,7 @@ void testLogStreamingToEP(@TempDir Path basePath) throws IOException { */ executeCommandsAndVerify(4, 4); - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("apply"), @@ -532,7 +480,7 @@ void testExecutionLogCleanupWhenOneOfTheExecutionLogPathIsNull(@TempDir Path bas however only 3 log files will be streamed + cleaned */ executeCommandsAndVerify(4, 3); - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("apply"), @@ -567,7 +515,7 @@ void testExecutionLogCleanupWhenExitEarlyOnFailedCommand(@TempDir Path basePath) so we expect 3 commands to be executed and 2 log files to be streamed + cleaned */ executeCommandsAndVerify(3, 2); - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("write_HCL"), @@ -582,16 +530,16 @@ void testExecutionLogCleanupWhenLogStreamingToEpFails(@TempDir Path basePath) { return setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath); }).when(terraformManager).execute(any(), any(), any()); - doThrow(new IllegalArgumentException("fake")).when(mockedCommandLogStreamingProcessor).streamLogsToEP(any(), any(), any()); + doThrow(new IllegalArgumentException("fake")).when(commandLogStreamingProcessor).streamLogsToEP(any(), any(), any()); commandManager.execute(message); // Wait for the command thread to complete - await().atMost(5, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); - verify(mockedCommandLogStreamingProcessor, times(4)).streamLogsToEP(any(), any(), any()); + verify(commandLogStreamingProcessor, times(4)).streamLogsToEP(any(), any(), any()); //we still expect cleanup to occur even though log streaming to ep fails - verify(mockedCommandLogStreamingProcessor, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); + verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture()); Assertions.assertThat(executionLogFileCaptor.getValue()) .containsExactlyInAnyOrder( basePath.resolve("apply"), @@ -600,16 +548,70 @@ void testExecutionLogCleanupWhenLogStreamingToEpFails(@TempDir Path basePath) { basePath.resolve("sync")); } + @Test + void testExecutionLogDeletionSuccessFlow(@TempDir Path logPath) throws IOException { + Path commandLog1 = logPath.resolve("log1"); + Path commandLog2 = logPath.resolve("log2"); + Path commandLog3 = logPath.resolve("log3"); + Path commandLog4 = logPath.resolve("log4"); + + Files.writeString(commandLog1, "log 1"); + Files.writeString(commandLog2, "log 2"); + Files.writeString(commandLog3, "log 3"); + Files.writeString(commandLog4, "log 4"); + List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); + + Assertions.assertThat( + allLogs.stream().allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + commandManager.deleteExecutionLogFiles( + List.of(commandLog1, commandLog2, commandLog3, commandLog4) + ); + + Assertions.assertThat( + allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + } + + @Test + void testExecutionLogDeletionWhenSomeLogFilesDontExist(@TempDir Path logPath) throws IOException { + Path commandLog1 = logPath.resolve("log1"); + Path commandLog2 = logPath.resolve("log2"); + Path commandLog3 = logPath.resolve("log3"); + Path commandLog4 = logPath.resolve("log4"); + + Files.writeString(commandLog1, "log 1"); + Files.writeString(commandLog2, "log 2"); + + List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); + + // Only 2 of the log files exist + Assertions.assertThat( + Stream.of(commandLog1, commandLog2).allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + /* Although only 2 out of 4 log files exist, the 2 log files will be deleted anyway + and the errors will be handled gracefully + */ + commandManager.deleteExecutionLogFiles( + List.of(commandLog1, commandLog2, commandLog3, commandLog4) + ); + + Assertions.assertThat( + allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) + ).isTrue(); + + } + private void executeCommandsAndVerify(int expectedNumberOfCommandExecutions, int expectedNumberOfLogFilesStreamed) { commandManager.execute(message); // Wait for the command thread to complete - await().atMost(5, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1)); verify(terraformManager, times(expectedNumberOfCommandExecutions)).execute(any(), any(), envArgCaptor.capture()); verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture()); - verify(mockedCommandLogStreamingProcessor, times(expectedNumberOfLogFilesStreamed)).streamLogsToEP(any(), any(), any()); + verify(commandLogStreamingProcessor, times(expectedNumberOfLogFilesStreamed)).streamLogsToEP(any(), any(), any()); } private Path setCommandStatusAndReturnExecutionLog(Command targetCommand, @@ -631,4 +633,6 @@ private Path setCommandStatusAndReturnExecutionLog(Command targetCommand, } } + + } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java index 426f1860..0e5618c3 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/processor/CommandLogStreamProcessorTest.java @@ -16,10 +16,8 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @@ -27,11 +25,8 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.LinkOption; import java.nio.file.Path; import java.util.List; -import java.util.stream.Stream; import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; import static org.junit.Assert.assertThrows; @@ -52,7 +47,6 @@ public class CommandLogStreamProcessorTest { private EventPortalProperties eventPortalProperties; @Autowired - @Qualifier("realCommandLogStreamingProcessor") private CommandLogStreamingProcessor realCommandLogStreamingProcessor; @Autowired @@ -166,60 +160,6 @@ void testStreamLogsToEPErrorCase() throws IOException { } - @Test - void testExecutionLogDeletionSuccessFlow(@TempDir Path logPath) throws IOException { - Path commandLog1 = logPath.resolve("log1"); - Path commandLog2 = logPath.resolve("log2"); - Path commandLog3 = logPath.resolve("log3"); - Path commandLog4 = logPath.resolve("log4"); - - Files.writeString(commandLog1, "log 1"); - Files.writeString(commandLog2, "log 2"); - Files.writeString(commandLog3, "log 3"); - Files.writeString(commandLog4, "log 4"); - List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); - - Assertions.assertThat( - allLogs.stream().allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - realCommandLogStreamingProcessor.deleteExecutionLogFiles( - List.of(commandLog1, commandLog2, commandLog3, commandLog4) - ); - - Assertions.assertThat( - allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - } - - @Test - void testExecutionLogDeletionWhenSomeLogFilesDontExist(@TempDir Path logPath) throws IOException { - Path commandLog1 = logPath.resolve("log1"); - Path commandLog2 = logPath.resolve("log2"); - Path commandLog3 = logPath.resolve("log3"); - Path commandLog4 = logPath.resolve("log4"); - - Files.writeString(commandLog1, "log 1"); - Files.writeString(commandLog2, "log 2"); - - List allLogs = List.of(commandLog1, commandLog2, commandLog3, commandLog4); - - // Only 2 of the log files exist - Assertions.assertThat( - Stream.of(commandLog1, commandLog2).allMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - /* Although only 2 out of 4 log files exist, the 2 log files will be deleted anyway - and the errors will be handled gracefully - */ - realCommandLogStreamingProcessor.deleteExecutionLogFiles( - List.of(commandLog1, commandLog2, commandLog3, commandLog4) - ); - - Assertions.assertThat( - allLogs.stream().noneMatch(path -> Files.exists(path, LinkOption.NOFOLLOW_LINKS)) - ).isTrue(); - - } - private CommandMessage buildCommandMessageForConfigPush(List commands) { CommandMessage message = new CommandMessage(); diff --git a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java index 73a78922..b87d2b9c 100644 --- a/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java +++ b/service/terraform-plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/terraform/manager/TerraformManager.java @@ -7,6 +7,7 @@ import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandResult; import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants; +import com.solace.maas.ep.event.management.agent.plugin.messagingService.event.EventProperty; import com.solace.maas.ep.event.management.agent.plugin.terraform.client.TerraformClient; import com.solace.maas.ep.event.management.agent.plugin.terraform.client.TerraformClientFactory; import com.solace.maas.ep.event.management.agent.plugin.terraform.configuration.TerraformProperties; @@ -37,6 +38,9 @@ public class TerraformManager { private final TerraformProperties terraformProperties; private final TerraformClientFactory terraformClientFactory; + + private boolean isManagedAgent; + public TerraformManager(TerraformLogProcessingService terraformLogProcessingService, TerraformProperties terraformProperties, TerraformClientFactory terraformClientFactory) { @@ -45,7 +49,9 @@ public TerraformManager(TerraformLogProcessingService terraformLogProcessingServ this.terraformClientFactory = terraformClientFactory; } - public Path execute(CommandRequest request, Command command, Map envVars) { + public Path execute(CommandRequest request, + Command command, + Map envVars) { MDC.put(RouteConstants.COMMAND_CORRELATION_ID, request.getCommandCorrelationId()); MDC.put(RouteConstants.MESSAGING_SERVICE_ID, request.getServiceId()); setEnvVarsFromParameters(command, envVars);