Skip to content

Commit

Permalink
Added message publisher. Changed CommandResponse to only have logs, n…
Browse files Browse the repository at this point in the history
…ot errors and logs. Error logs are determined by the log level.
  • Loading branch information
gregmeldrum committed Nov 28, 2023
1 parent a96d82d commit dc2b308
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.solace.maas.ep.event.management.agent.command;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.command.mapper.CommandMapper;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandRequest;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

Expand All @@ -17,28 +21,43 @@
@Service
public class CommandManager {
private final TerraformManager terraformManager;

private final CommandMapper commandMapper;
private final CommandPublisher commandPublisher;
private final MessagingServiceDelegateService messagingServiceDelegateService;
private final EventPortalProperties eventPortalProperties;

public CommandManager(TerraformManager terraformManager, MessagingServiceDelegateService messagingServiceDelegateService) {
public CommandManager(TerraformManager terraformManager, CommandMapper commandMapper,
CommandPublisher commandPublisher, MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties) {
this.terraformManager = terraformManager;
this.commandMapper = commandMapper;
this.commandPublisher = commandPublisher;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.eventPortalProperties = eventPortalProperties;
}

public void execute(CommandRequest request) {
public void execute(CommandMessage request) {
Map<String, String> envVars = setBrokerSpecificEnvVars(request.getServiceId());
CommandRequest commandRequest = commandMapper.map(request);
for (CommandBundle bundle : request.getCommandBundles()) {
// For now everything is run serially
for (Command command : bundle.getCommands()) {
switch (command.getCommandType()) {
case terraform:
terraformManager.execute(request, command, envVars);
terraformManager.execute(commandRequest, command, envVars);
break;
default:
throw new IllegalStateException("Unexpected value: " + command.getCommandType());
}
}
}

Map<String, String> topicVars = Map.of(
"orgId", request.getOrgId(),
"runtimeAgentId", eventPortalProperties.getRuntimeAgentId(),
"correlationId", request.getCorrelationId()
);
commandPublisher.sendCommandResponse(request, topicVars);
}

private Map<String, String> setBrokerSpecificEnvVars(String messagingServiceId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.solace.maas.ep.event.management.agent.command.mapper;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandRequest;
import org.mapstruct.Mapper;

@Mapper(componentModel = "spring")
public interface CommandMapper {

CommandRequest map(CommandMessage input);

CommandMessage map(CommandRequest input);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.solace.maas.ep.event.management.agent.publisher;

import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandPublisher {

private final SolacePublisher solacePublisher;

public CommandPublisher(SolacePublisher solacePublisher) {
this.solacePublisher = solacePublisher;
}

/**
* Sends the command response to EP.
* <p>
* The topic for command response:
* sc/ep/runtime/{orgId}/{runtimeAgentId}/commandResponse/v1/{correlationId}
*/

public void sendCommandResponse(MOPMessage message, Map<String, String> topicDetails) {

String topicString =
String.format("sc/ep/runtime/%s/%s/commandResponse/v1/%s",
topicDetails.get("orgId"),
topicDetails.get("runtimeAgentId"),
topicDetails.get("correlationId"));

solacePublisher.publish(message, topicString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.solace.maas.ep.event.management.agent.subscriber;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

/**
* This is here for developers to test that messages are flowing without having to test with ep-core.
* <p>
* This should normally be disabled.
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandMessageHandler extends SolaceMessageHandler<CommandMessage> {

private final CommandManager commandManager;

public CommandMessageHandler(
SolaceConfiguration solaceConfiguration,
SolaceSubscriber solaceSubscriber, CommandManager commandManager) {
super(solaceConfiguration.getTopicPrefix() + "command/v1/>", solaceSubscriber);
this.commandManager = commandManager;
}

@Override
public void receiveMessage(String destinationName, CommandMessage message) {
log.debug("receiveMessage {}\n{}", destinationName, message);
commandManager.execute(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.jboss.logging.MDC;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
Expand Down Expand Up @@ -69,17 +70,32 @@ public void onMessage(InboundMessage inboundMessage) {

String receivedClassName = messageClass.getSimpleName();

if ("ScanCommandMessage".equals(receivedClassName) || "ScanDataImportMessage".equals(receivedClassName)) {
Map<String, Object> map = objectMapper.readValue(messageAsString, Map.class);
String scanId = (String) map.get("scanId");
List<String> scanClassNames = List.of("ScanCommandMessage", "ScanDataImportMessage");
List<String> commandClassNames = List.of("CommandMessage");
List<String> expectedClassNames = List.of(scanClassNames, commandClassNames).stream().flatMap(List::stream).toList();

Map<String, Object> map = objectMapper.readValue(messageAsString, Map.class);

if (expectedClassNames.contains(receivedClassName)) {
String traceId = (String) map.get("traceId");
String actorId = (String) map.get("actorId");
String messagingServiceId = (String) map.get("messagingServiceId");

MDC.clear();
MDC.put(RouteConstants.SCAN_ID, scanId);
MDC.put(RouteConstants.TRACE_ID, traceId);
MDC.put(RouteConstants.ACTOR_ID, actorId);
}

if (scanClassNames.contains(receivedClassName)) {
String scanId = (String) map.get("scanId");
String messagingServiceId = (String) map.get("messagingServiceId");
MDC.put(RouteConstants.SCAN_ID, scanId);
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);
}

if (commandClassNames.contains(receivedClassName)) {
String correlationId = (String) map.get("correlationId");
String messagingServiceId = (String) map.get("serviceId");
MDC.put(RouteConstants.COMMAND_CORRELATION_ID, correlationId);
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import com.solace.maas.ep.event.management.agent.plugin.manager.client.kafkaClient.KafkaClientReconnection;
import com.solace.maas.ep.event.management.agent.plugin.manager.client.kafkaClient.KafkaClientReconnectionConfig;
import com.solace.maas.ep.event.management.agent.plugin.messagingService.RtoMessageBuilder;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.plugin.vmr.VmrProcessor;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.testConfigs.MessagingServiceTestConfig;
import com.solace.maas.ep.event.management.agent.testConfigs.PublisherTestConfig;
import com.solace.maas.ep.event.management.agent.util.IDGenerator;
Expand Down Expand Up @@ -79,6 +82,27 @@ public VmrProcessor getVmrProcessor() {
return processor;
}

@Bean
@Primary
public MessagingServiceDelegateService getMessagingServiceDelegateService() {
return mock(MessagingServiceDelegateService.class);
}

@Bean
@Primary
public TerraformManager getTerraformManager() {
return mock(TerraformManager.class);
}

// @Bean
// @Primary
// public EventPortalProperties getEventPortalProperties() {
// EventPortalProperties eventPortalProperties = new EventPortalProperties();
// eventPortalProperties.setOrganizationId("myOrgId");
// eventPortalProperties.setRuntimeAgentId("myRuntimeAgentId");
// return eventPortalProperties;
// }

@Bean
@Primary
public OutboundMessageBuilder outboundMessageBuilder() {
Expand Down Expand Up @@ -119,6 +143,12 @@ public IDGenerator idGenerator() {
return idGenerator;
}

@Bean
@Primary
public CommandPublisher getCommandPublisher() {
return mock(CommandPublisher.class);
}

@Bean
@Primary
KafkaClientConnection kafkaClientConnection() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.solace.maas.ep.event.management.agent.commandManager;

import com.solace.maas.ep.common.messages.CommandMessage;
import com.solace.maas.ep.event.management.agent.TestConfig;
import com.solace.maas.ep.event.management.agent.command.CommandManager;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.command.model.Command;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandType;
import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType;
import com.solace.maas.ep.event.management.agent.plugin.service.MessagingServiceDelegateService;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SempClient;
import com.solace.maas.ep.event.management.agent.plugin.solace.processor.semp.SolaceHttpSemp;
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.util.List;
import java.util.Map;

import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ActiveProfiles("TEST")
@EnableAutoConfiguration
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = TestConfig.class)
public class CommandManagerTests {

@Autowired
CommandManager commandManager;

@Autowired
TerraformManager terraformManager;

@Autowired
CommandPublisher commandPublisher;

@Autowired
MessagingServiceDelegateService messagingServiceDelegateService;

@Autowired
EventPortalProperties eventPortalProperties;

@Test
public void testCommandManager() {
// Create a command request message
CommandMessage message = new CommandMessage();
message.setOrigType(MOPSvcType.maasEventMgmt);
message.withMessageType(generic);
message.setContext("abc");
message.setActorId("myActorId");
message.setOrgId(eventPortalProperties.getOrganizationId());
message.setTraceId("myTraceId");
message.setCorrelationId("myCorrelationId");
message.setCommandBundles(List.of(
CommandBundle.builder()
.executionType(ExecutionType.serial)
.exitOnFailure(false)
.commands(List.of(
Command.builder()
.commandType(CommandType.terraform)
.body("asdfasdfadsf")
.command("apply")
.build()))
.build()));

doNothing().when(terraformManager).execute(any(), any(), any());

ArgumentCaptor<Map<String, String>> topicArgCaptor = ArgumentCaptor.forClass(Map.class);
doNothing().when(commandPublisher).sendCommandResponse(any(), any());
when(messagingServiceDelegateService.getMessagingServiceClient(any())).thenReturn(
new SolaceHttpSemp(SempClient.builder()
.username("myUsername")
.password("myPassword")
.connectionUrl("myConnectionUrl")
.build()));
commandManager.execute(message);

// Verify terraform manager is called
ArgumentCaptor<Map<String, String>> envArgCaptor = ArgumentCaptor.forClass(Map.class);
verify(terraformManager, times(1)).execute(any(), any(), envArgCaptor.capture());

// Verify the env vars are set with the terraform manager is called
Map<String, String> envVars = envArgCaptor.getValue();
assert envVars.get("TF_VAR_password").equals("myPassword");
assert envVars.get("TF_VAR_username").equals("myUsername");
assert envVars.get("TF_VAR_url").equals("myConnectionUrl");

verify(commandPublisher, times(1)).sendCommandResponse(any(), topicArgCaptor.capture());

Map<String, String> topicVars = topicArgCaptor.getValue();
assert topicVars.get("orgId").equals(eventPortalProperties.getOrganizationId());
assert topicVars.get("runtimeAgentId").equals(eventPortalProperties.getRuntimeAgentId());
assert topicVars.get("correlationId").equals(message.getCorrelationId());
}
}
Loading

0 comments on commit dc2b308

Please sign in to comment.