Skip to content

Commit

Permalink
[DATAGO-64298] Merge branch 'main' into moodiRealist/DATAGO-64298-rem…
Browse files Browse the repository at this point in the history
…ove-old-logs
  • Loading branch information
moodiRealist committed Dec 14, 2023
2 parents 7c48e68 + fad6445 commit c499c2e
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 33 deletions.
7 changes: 7 additions & 0 deletions service/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@
<version>2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>solclientj</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager.LOG_LEVEL_ERROR;
Expand All @@ -34,6 +37,7 @@ public class CommandManager {
private final CommandPublisher commandPublisher;
private final MessagingServiceDelegateService messagingServiceDelegateService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor configPushPool;

public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper,
CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService,
Expand All @@ -43,9 +47,28 @@ public CommandManager(TerraformManager terraformManager, CommandMapper commandMa
this.commandPublisher = commandPublisher;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.eventPortalProperties = eventPortalProperties;
configPushPool = new ThreadPoolTaskExecutor();
configPushPool.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
configPushPool.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
configPushPool.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize());
configPushPool.setThreadNamePrefix("config-push-pool-");
configPushPool.setTaskDecorator(new MdcTaskDecorator());
configPushPool.initialize();
}

public void execute(CommandMessage request) {

CompletableFuture.runAsync(() -> configPush(request), configPushPool)
.exceptionally(e -> {
log.error("Error running command", e);
Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, (Exception) e);
sendResponse(request);
return null;
});
}

public void configPush(CommandMessage request) {
Map<String, String> envVars;
try {
envVars = setBrokerSpecificEnvVars(request.getServiceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public class EventPortalProperties {

private String topicPrefix;

private int commandThreadPoolMinSize = 5;
private int commandThreadPoolMaxSize = 10;
private int commandThreadPoolQueueSize = 1_000;

private GatewayProperties gateway
= new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of()));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.solace.maas.ep.event.management.agent.util;

import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class MdcTaskDecorator implements TaskDecorator {

@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> contextMap = Optional.ofNullable(MDC.getCopyOfContextMap()).orElse(new HashMap<>());
return () -> {
try {
MDC.setContextMap(contextMap);
runnable.run();
} finally {
MDC.clear();
}

};
}
}
3 changes: 3 additions & 0 deletions service/application/src/main/resources/application-TEST.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ eventPortal:
runtimeAgentId: ${EP_RUNTIME_AGENT_ID:1234567}
organizationId: ${EP_ORGANIZATION_ID:myOrg123}
topicPrefix: ${EP_TOPIC_PREFIX:sc/ep/runtime}
commandThreadPoolMinSize: 5
commandThreadPoolMaxSize: 5
commandThreadPoolQueueSize: 10
gateway:
id: decal5
name: evmr1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,42 @@
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandType;
import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType;
import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.ActiveProfiles;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID;
import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -38,43 +54,117 @@
public class CommandManagerTests {

@Autowired
CommandManager commandManager;
private CommandManager commandManager;

@Autowired
TerraformManager terraformManager;
private TerraformManager terraformManager;

@Autowired
CommandPublisher commandPublisher;
private CommandPublisher commandPublisher;

@Autowired
MessagingServiceDelegateService messagingServiceDelegateService;
private MessagingServiceDelegateService messagingServiceDelegateService;

@Autowired
EventPortalProperties eventPortalProperties;
private EventPortalProperties eventPortalProperties;

private final static ThreadPoolTaskExecutor testThreadPool = new ThreadPoolTaskExecutor();

@BeforeEach
public void cleanup() {
reset(terraformManager);
reset(commandPublisher);
}

@Test
public void testCommandManager() {
// Create a command request message
CommandMessage message = new CommandMessage();
message.setOrigType(MOPSvcType.maasEventMgmt);
message.withMessageType(generic);
message.setContext("abc");
message.setActorId("myActorId");
message.setOrgId(eventPortalProperties.getOrganizationId());
message.setTraceId("myTraceId");
message.setCommandCorrelationId("myCorrelationId");
message.setCommandBundles(List.of(
CommandBundle.builder()
.executionType(ExecutionType.serial)
.exitOnFailure(false)
.commands(List.of(
Command.builder()
.commandType(CommandType.terraform)
.body("asdfasdfadsf")
.command("apply")
.build()))
void testMultiThreadedCommandManager() throws InterruptedException {

// Set up the thread pool
int commandThreadPoolQueueSize = eventPortalProperties.getCommandThreadPoolQueueSize();
testThreadPool.setCorePoolSize(commandThreadPoolQueueSize);
testThreadPool.initialize();


// Build enough requests to fill the command thread pool queue
List<CommandMessage> messageList = new ArrayList<>();
for (int i = 0; i < commandThreadPoolQueueSize; i++) {
messageList.add(getCommandMessage(Integer.toString(i)));
}

doNothing().when(commandPublisher).sendCommandResponse(any(), any());
doAnswer(invocation -> {
// Simulate the time spent for a SEMP command to complete
TimeUnit.SECONDS.sleep(1);
return null;
}).when(terraformManager).execute(any(), any(), any());

ArgumentCaptor<Map<String, String>> topicArgCaptor = ArgumentCaptor.forClass(Map.class);

when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));

// Execute all the commands in parallel to fill the command thread pool queue
IntStream.rangeClosed(1, commandThreadPoolQueueSize).parallel().forEach(i ->
CompletableFuture.runAsync(() -> commandManager.execute(messageList.get(i - 1)), testThreadPool));

// Wait for all the threads to complete (add a timeout just in case)
await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(commandThreadPoolQueueSize));

// Verify terraform manager is called
ArgumentCaptor<Map<String, String>> envArgCaptor = ArgumentCaptor.forClass(Map.class);
verify(terraformManager, times(commandThreadPoolQueueSize)).execute(any(), any(), envArgCaptor.capture());

// Verify the env vars are set with the terraform manager is called
Map<String, String> envVars = envArgCaptor.getValue();
assert envVars.get("TF_VAR_password").equals("myPassword");
assert envVars.get("TF_VAR_username").equals("myUsername");
assert envVars.get("TF_VAR_url").equals("myConnectionUrl");

ArgumentCaptor<CommandMessage> messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class);
verify(commandPublisher, times(commandThreadPoolQueueSize)).sendCommandResponse(messageArgCaptor.capture(), topicArgCaptor.capture());

Map<String, String> topicVars = topicArgCaptor.getValue();
assert topicVars.get("orgId").equals(eventPortalProperties.getOrganizationId());
assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId());

// Make sure we get all 10 correlation ids in the response messages
List<String> receivedCorrelationIds = messageArgCaptor.getAllValues().stream().map(CommandMessage::getCommandCorrelationId).toList();
List<String> expectedCorrelationIds = IntStream.range(0, commandThreadPoolQueueSize).mapToObj(i -> "myCorrelationId" + i).toList();
assertTrue(receivedCorrelationIds.size() == expectedCorrelationIds.size() &&
receivedCorrelationIds.containsAll(expectedCorrelationIds) && expectedCorrelationIds.containsAll(receivedCorrelationIds));
}

@Test
void failSendingResponseBackToEp() {
// Create a command request message
CommandMessage message = getCommandMessage("1");

doNothing().when(terraformManager).execute(any(), any(), any());
doThrow(new RuntimeException("Error sending response back to EP")).when(commandPublisher).sendCommandResponse(any(), any());
commandManager.execute(message);

// Wait for the command thread to complete
await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(2));

ArgumentCaptor<CommandMessage> messageArgCaptor = ArgumentCaptor.forClass(CommandMessage.class);
verify(commandPublisher, times(2)).sendCommandResponse(messageArgCaptor.capture(), any());

// Check that we attempted to set Error in the response message
messageArgCaptor.getAllValues().forEach(commandMessage -> {
assert commandMessage.getCommandCorrelationId().equals(message.getCommandCorrelationId());
assert commandMessage.getCommandBundles().get(0).getCommands().get(0).getResult().getStatus().equals(JobStatus.error);
});
}

@Test
void testCommandManager() {
// Create a command request message
CommandMessage message = getCommandMessage("1");

doNothing().when(terraformManager).execute(any(), any(), any());

ArgumentCaptor<Map<String, String>> topicArgCaptor = ArgumentCaptor.forClass(Map.class);
Expand All @@ -85,8 +175,12 @@ public void testCommandManager() {
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));

commandManager.execute(message);

// Wait for the command thread to complete
await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1));

// Verify terraform manager is called
ArgumentCaptor<Map<String, String>> envArgCaptor = ArgumentCaptor.forClass(Map.class);
verify(terraformManager, times(1)).execute(any(), any(), envArgCaptor.capture());
Expand All @@ -104,4 +198,65 @@ public void testCommandManager() {
assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId());
assert topicVars.get(COMMAND_CORRELATION_ID).equals(message.getCommandCorrelationId());
}

@Test
void verifyMDCIsSetInCommandManagerThread() {
// Create a command request message
CommandMessage message = getCommandMessage("1");
AtomicBoolean mdcIsSet = new AtomicBoolean(false);
doAnswer(invocation -> {
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();
String commandCorrelationId = mdcContextMap.get(COMMAND_CORRELATION_ID);
String traceId = mdcContextMap.get(TRACE_ID);
if (commandCorrelationId.equals(message.getCommandCorrelationId()) &&
traceId.equals(message.getTraceId())) {
mdcIsSet.set(true);
}
return null;
}).when(terraformManager).execute(any(), any(), any());

doNothing().when(commandPublisher).sendCommandResponse(any(), any());
when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));

MDC.put(COMMAND_CORRELATION_ID, message.getCommandCorrelationId());
MDC.put(TRACE_ID, message.getTraceId());
commandManager.execute(message);

// Wait for the command thread to complete
await().atMost(10, TimeUnit.SECONDS).until(() -> commandPublisherIsInvoked(1));

assertTrue(mdcIsSet.get());
}

private CommandMessage getCommandMessage(String suffix) {
CommandMessage message = new CommandMessage();
message.setOrigType(MOPSvcType.maasEventMgmt);
message.withMessageType(generic);
message.setContext("abc");
message.setActorId("myActorId");
message.setOrgId(eventPortalProperties.getOrganizationId());
message.setTraceId("myTraceId");
message.setCommandCorrelationId("myCorrelationId" + suffix);
message.setCommandBundles(List.of(
CommandBundle.builder()
.executionType(ExecutionType.serial)
.exitOnFailure(false)
.commands(List.of(
Command.builder()
.commandType(CommandType.terraform)
.body("asdfasdfadsf")
.command("apply")
.build()))
.build()));
return message;
}

private Boolean commandPublisherIsInvoked(int numberOfExpectedInvocations) {
return Mockito.mockingDetails(commandPublisher).getInvocations().size() == numberOfExpectedInvocations;
}
}
Loading

0 comments on commit c499c2e

Please sign in to comment.