diff --git a/service/application/pom.xml b/service/application/pom.xml index 6c632cf97..b0cdd9af0 100644 --- a/service/application/pom.xml +++ b/service/application/pom.xml @@ -280,6 +280,13 @@ 2.2 test + + org.awaitility + awaitility + 4.2.0 + test + + com.solacesystems solclientj diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java index 8d111c927..035465345 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/command/CommandManager.java @@ -12,14 +12,17 @@ import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp; import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager; import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.time.OffsetDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager.LOG_LEVEL_ERROR; @@ -34,6 +37,7 @@ public class CommandManager { private final CommandPublisher commandPublisher; private final MessagingServiceDelegateService messagingServiceDelegateService; private final EventPortalProperties eventPortalProperties; + private final ThreadPoolTaskExecutor configPushPool; public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper, CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService, @@ -43,9 +47,28 @@ public CommandManager(TerraformManager terraformManager, CommandMapper commandMa this.commandPublisher = commandPublisher; this.messagingServiceDelegateService = messagingServiceDelegateService; this.eventPortalProperties = eventPortalProperties; + configPushPool = new ThreadPoolTaskExecutor(); + configPushPool.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize()); + configPushPool.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize()); + configPushPool.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize()); + configPushPool.setThreadNamePrefix("config-push-pool-"); + configPushPool.setTaskDecorator(new MdcTaskDecorator()); + configPushPool.initialize(); } public void execute(CommandMessage request) { + + CompletableFuture.runAsync(() -> configPush(request), configPushPool) + .exceptionally(e -> { + log.error("Error running command", e); + Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0); + setCommandError(firstCommand, (Exception) e); + sendResponse(request); + return null; + }); + } + + public void configPush(CommandMessage request) { Map envVars; try { envVars = setBrokerSpecificEnvVars(request.getServiceId()); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java index a2976b96b..e7cba43da 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java @@ -18,6 +18,10 @@ public class EventPortalProperties { private String topicPrefix; + private int commandThreadPoolMinSize = 5; + private int commandThreadPoolMaxSize = 10; + private int commandThreadPoolQueueSize = 1_000; + private GatewayProperties gateway = new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of())); } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/util/MdcTaskDecorator.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/util/MdcTaskDecorator.java new file mode 100644 index 000000000..b44fa90b4 --- /dev/null +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/util/MdcTaskDecorator.java @@ -0,0 +1,25 @@ +package com.solace.maas.ep.event.management.agent.util; + +import org.slf4j.MDC; +import org.springframework.core.task.TaskDecorator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class MdcTaskDecorator implements TaskDecorator { + + @Override + public Runnable decorate(Runnable runnable) { + Map contextMap = Optional.ofNullable(MDC.getCopyOfContextMap()).orElse(new HashMap<>()); + return () -> { + try { + MDC.setContextMap(contextMap); + runnable.run(); + } finally { + MDC.clear(); + } + + }; + } +} diff --git a/service/application/src/main/resources/application-TEST.yml b/service/application/src/main/resources/application-TEST.yml index 55e3ed2ca..df723660c 100644 --- a/service/application/src/main/resources/application-TEST.yml +++ b/service/application/src/main/resources/application-TEST.yml @@ -24,6 +24,9 @@ eventPortal: runtimeAgentId: ${EP_RUNTIME_AGENT_ID:1234567} organizationId: ${EP_ORGANIZATION_ID:myOrg123} topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime} + commandThreadPoolMinSize: 5 + commandThreadPoolMaxSize: 5 + commandThreadPoolQueueSize: 10 gateway: id: decal5 name: evmr1 diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java index 68d003ba6..06d9b29ed 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/commandManager/CommandManagerTests.java @@ -8,26 +8,42 @@ import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle; import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandType; import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType; +import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus; import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType; import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService; import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient; import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp; import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager; import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.context.ActiveProfiles; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID; +import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID; import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,43 +54,117 @@ public class CommandManagerTests { @Autowired - CommandManager commandManager; + private CommandManager commandManager; @Autowired - TerraformManager terraformManager; + private TerraformManager terraformManager; @Autowired - CommandPublisher commandPublisher; + private CommandPublisher commandPublisher; @Autowired - MessagingServiceDelegateService messagingServiceDelegateService; + private MessagingServiceDelegateService messagingServiceDelegateService; @Autowired - EventPortalProperties eventPortalProperties; + private EventPortalProperties eventPortalProperties; + + private final static ThreadPoolTaskExecutor testThreadPool = new ThreadPoolTaskExecutor(); + + @BeforeEach + public void cleanup() { + reset(terraformManager); + reset(commandPublisher); + } @Test - public void testCommandManager() { - // Create a command request message - CommandMessage message = new CommandMessage(); - message.setOrigType(MOPSvcType.maasEventMgmt); - message.withMessageType(generic); - message.setContext("abc"); - message.setActorId("myActorId"); - message.setOrgId(eventPortalProperties.getOrganizationId()); - message.setTraceId("myTraceId"); - message.setCommandCorrelationId("myCorrelationId"); - message.setCommandBundles(List.of( - CommandBundle.builder() - .executionType(ExecutionType.serial) - .exitOnFailure(false) - .commands(List.of( - Command.builder() - .commandType(CommandType.terraform) - .body("asdfasdfadsf") - .command("apply") - .build())) + void testMultiThreadedCommandManager() throws InterruptedException { + + // Set up the thread pool + int commandThreadPoolQueueSize = eventPortalProperties.getCommandThreadPoolQueueSize(); + testThreadPool.setCorePoolSize(commandThreadPoolQueueSize); + testThreadPool.initialize(); + + + // Build enough requests to fill the command thread pool queue + List messageList = new ArrayList<>(); + for (int i = 0; i < commandThreadPoolQueueSize; i++) { + messageList.add(getCommandMessage(Integer.toString(i))); + } + + doNothing().when(commandPublisher).sendCommandResponse(any(), any()); + doAnswer(invocation -> { + // Simulate the time spent for a SEMP command to complete + TimeUnit.SECONDS.sleep(1); + return null; + }).when(terraformManager).execute(any(), any(), any()); + + ArgumentCaptor> topicArgCaptor = ArgumentCaptor.forClass(Map.class); + + when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn( + new SolaceHttpSemp(SempClient.builder() + .username("myUsername") + .password("myPassword") + .connectionUrl("myConnectionUrl") .build())); + // Execute all the commands in parallel to fill the command thread pool queue + IntStream.rangeClosed(1, commandThreadPoolQueueSize).parallel().forEach(i -> + CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool)); + + // Wait for all the threads to complete (add a timeout just in case) + await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(commandThreadPoolQueueSize)); + + // Verify terraform manager is called + ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); + verify(terraformManager, times(commandThreadPoolQueueSize)).execute(any(), any(), envArgCaptor.capture()); + + // Verify the env vars are set with the terraform manager is called + Map envVars = envArgCaptor.getValue(); + assert envVars.get("TF_VAR_password").equals("myPassword"); + assert envVars.get("TF_VAR_username").equals("myUsername"); + assert envVars.get("TF_VAR_url").equals("myConnectionUrl"); + + ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); + verify(commandPublisher, times(commandThreadPoolQueueSize)).sendCommandResponse(messageArgCaptor.capture(), topicArgCaptor.capture()); + + Map topicVars = topicArgCaptor.getValue(); + assert topicVars.get("orgId").equals(eventPortalProperties.getOrganizationId()); + assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId()); + + // Make sure we get all 10 correlation ids in the response messages + List receivedCorrelationIds = messageArgCaptor.getAllValues().stream().map(CommandMessage::getCommandCorrelationId).toList(); + List expectedCorrelationIds = IntStream.range(0, commandThreadPoolQueueSize).mapToObj(i -> "myCorrelationId" + i).toList(); + assertTrue(receivedCorrelationIds.size() == expectedCorrelationIds.size() && + receivedCorrelationIds.containsAll(expectedCorrelationIds) && expectedCorrelationIds.containsAll(receivedCorrelationIds)); + } + + @Test + void failSendingResponseBackToEp() { + // Create a command request message + CommandMessage message = getCommandMessage("1"); + + doNothing().when(terraformManager).execute(any(), any(), any()); + doThrow(new RuntimeException("Error sending response back to EP")).when(commandPublisher).sendCommandResponse(any(), any()); + commandManager.execute(message); + + // Wait for the command thread to complete + await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(2)); + + ArgumentCaptor messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class); + verify(commandPublisher, times(2)).sendCommandResponse(messageArgCaptor.capture(), any()); + + // Check that we attempted to set Error in the response message + messageArgCaptor.getAllValues().forEach(commandMessage -> { + assert commandMessage.getCommandCorrelationId().equals(message.getCommandCorrelationId()); + assert commandMessage.getCommandBundles().get(0).getCommands().get(0).getResult().getStatus().equals(JobStatus.error); + }); + } + + @Test + void testCommandManager() { + // Create a command request message + CommandMessage message = getCommandMessage("1"); + doNothing().when(terraformManager).execute(any(), any(), any()); ArgumentCaptor> topicArgCaptor = ArgumentCaptor.forClass(Map.class); @@ -85,8 +175,12 @@ public void testCommandManager() { .password("myPassword") .connectionUrl("myConnectionUrl") .build())); + commandManager.execute(message); + // Wait for the command thread to complete + await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + // Verify terraform manager is called ArgumentCaptor> envArgCaptor = ArgumentCaptor.forClass(Map.class); verify(terraformManager, times(1)).execute(any(), any(), envArgCaptor.capture()); @@ -104,4 +198,65 @@ public void testCommandManager() { assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId()); assert topicVars.get(COMMAND_CORRELATION_ID).equals(message.getCommandCorrelationId()); } + + @Test + void verifyMDCIsSetInCommandManagerThread() { + // Create a command request message + CommandMessage message = getCommandMessage("1"); + AtomicBoolean mdcIsSet = new AtomicBoolean(false); + doAnswer(invocation -> { + Map mdcContextMap = MDC.getCopyOfContextMap(); + String commandCorrelationId = mdcContextMap.get(COMMAND_CORRELATION_ID); + String traceId = mdcContextMap.get(TRACE_ID); + if (commandCorrelationId.equals(message.getCommandCorrelationId()) && + traceId.equals(message.getTraceId())) { + mdcIsSet.set(true); + } + return null; + }).when(terraformManager).execute(any(), any(), any()); + + doNothing().when(commandPublisher).sendCommandResponse(any(), any()); + when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn( + new SolaceHttpSemp(SempClient.builder() + .username("myUsername") + .password("myPassword") + .connectionUrl("myConnectionUrl") + .build())); + + MDC.put(COMMAND_CORRELATION_ID, message.getCommandCorrelationId()); + MDC.put(TRACE_ID, message.getTraceId()); + commandManager.execute(message); + + // Wait for the command thread to complete + await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1)); + + assertTrue(mdcIsSet.get()); + } + + private CommandMessage getCommandMessage(String suffix) { + CommandMessage message = new CommandMessage(); + message.setOrigType(MOPSvcType.maasEventMgmt); + message.withMessageType(generic); + message.setContext("abc"); + message.setActorId("myActorId"); + message.setOrgId(eventPortalProperties.getOrganizationId()); + message.setTraceId("myTraceId"); + message.setCommandCorrelationId("myCorrelationId" + suffix); + message.setCommandBundles(List.of( + CommandBundle.builder() + .executionType(ExecutionType.serial) + .exitOnFailure(false) + .commands(List.of( + Command.builder() + .commandType(CommandType.terraform) + .body("asdfasdfadsf") + .command("apply") + .build())) + .build())); + return message; + } + + private Boolean commandPublisherIsInvoked(int numberOfExpectedInvocations) { + return Mockito.mockingDetails(commandPublisher).getInvocations().size() == numberOfExpectedInvocations; + } } diff --git a/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java b/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java index f46ebfc98..0c46aeb34 100644 --- a/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java +++ b/service/terraform-plugin/src/test/java/com/solace/maas/ep/event/management/agent/plugin/terraform/real/TerraformClientRealTests.java @@ -28,7 +28,9 @@ import java.util.concurrent.Future; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.fail; @Slf4j @ActiveProfiles("TEST") @@ -47,17 +49,17 @@ public class TerraformClientRealTests { ); @Test - public void planCreateNewQueue() { + void planCreateNewQueue() { executeTerraformCommand("addQueue.tf", "plan"); } @Test - public void createNewQueue() { + void createNewQueue() { executeTerraformCommand("addQueue.tf", "apply"); } @Test - public void create2Queues() { + void create2DifferentQueues() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future> future1 = executorService.submit(() -> executeTerraformCommand("addQueue.tf", "apply")); @@ -73,24 +75,44 @@ public void create2Queues() { } @Test - public void delete2Queues() { + void create2OfTheSameQueue() { + ExecutorService executorService = Executors.newFixedThreadPool(10); + Future> future1 = executorService.submit(() -> + executeTerraformCommand("addQueue.tf", "apply", "app123-consumer", JobStatus.success)); + Future> future2 = executorService.submit(() -> + executeTerraformCommand("addQueue.tf", "apply", "app123-consumer", JobStatus.error)); + // wait for the futures to complete + try { + List command1Bundles = future1.get(); + List command2Bundles = future2.get(); + // We expect the first one to succeed and the second one to fail + assertEquals(JobStatus.success, command1Bundles.get(0).getCommands().get(0).getResult().getStatus()); + assertEquals(JobStatus.error, command2Bundles.get(0).getCommands().get(0).getResult().getStatus()); + } catch (Exception e) { + log.error("Error waiting for futures to complete", e); + fail(); + } + } + + @Test + void delete2Queues() { executeTerraformCommand("deleteQueue.tf", "apply"); executeTerraformCommand("deleteQueue2.tf", "apply", "app123-consumer2"); } @Test - public void deleteNewQueue() { + void deleteNewQueue() { executeTerraformCommand("deleteQueue.tf", "apply"); } @Test - public void updateNewQueue() { + void updateNewQueue() { executeTerraformCommand("updateQueue.tf", "apply"); } @Test - public void importResource() { + void importResource() { String newQueueTf = asString(resourceLoader.getResource("classpath:realTfFiles" + File.separator + "addQueue.tf")); Command commandRequest1 = Command.builder() @@ -141,6 +163,10 @@ private List executeTerraformCommand(String hclFileName, String t } private List executeTerraformCommand(String hclFileName, String tfVerb, String context) { + return executeTerraformCommand(hclFileName, tfVerb, context, JobStatus.success); + } + + private List executeTerraformCommand(String hclFileName, String tfVerb, String context, JobStatus expectedJobStatus) { String terraformString = asString(resourceLoader.getResource("classpath:realTfFiles" + File.separator + hclFileName)); Command commandRequest = Command.builder() @@ -168,7 +194,7 @@ private List executeTerraformCommand(String hclFileName, String t for (Command command : commandBundle.getCommands()) { CommandResult result = command.getResult(); System.out.println("Logs " + result.getLogs()); - assertNotSame(JobStatus.error, result.getStatus()); + assertEquals(expectedJobStatus, result.getStatus()); } }