Skip to content

Commit

Permalink
DATAGO-75198 wip
Browse files Browse the repository at this point in the history
  • Loading branch information
rudraneel-chakraborty committed Jun 16, 2024
1 parent 9b4988c commit 28c349e
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 335 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.solace.maas.ep.common.messages;

import com.solace.maas.ep.common.model.CommandMessageWithResources;
import com.solace.maas.ep.common.model.EventBrokerResourceConfiguration;
import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle;
import com.solace.maas.ep.event.management.agent.plugin.command.model.JobStatus;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
Expand All @@ -11,13 +13,14 @@
import java.util.List;

@Data
public class CommandMessage extends MOPMessage {
public class CommandMessage extends MOPMessage implements CommandMessageWithResources {

private String commandCorrelationId;
private String context;
private String serviceId;
private JobStatus status;
private List<CommandBundle> commandBundles;
private List<EventBrokerResourceConfiguration> resources;

public CommandMessage() {
super();
Expand All @@ -40,6 +43,25 @@ public CommandMessage(String serviceId,
this.commandBundles = commandBundles;
}

public CommandMessage(String serviceId,
String commandCorrelationId,
String context,
JobStatus status,
List<CommandBundle> commandBundles,
List<EventBrokerResourceConfiguration> resources) {
super();
withMessageType(MOPMessageType.generic)
.withProtocol(MOPProtocol.epConfigPush)
.withVersion("1")
.withUhFlag(MOPUHFlag.ignore);
this.serviceId = serviceId;
this.commandCorrelationId = commandCorrelationId;
this.context = context;
this.status = status;
this.commandBundles = commandBundles;
this.resources = resources;
}

@Override
public String toLog() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,29 @@ public void execute(CommandMessage request) {
CompletableFuture.runAsync(() -> configPush(requestBO), configPushPool)
.exceptionally(e -> {
log.error("Error running command", e);
Command firstCommand = requestBO.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, (Exception) e);
finalizeAndSendResponse(requestBO);
handleError((Exception) e, requestBO);
return null;
});
}

public void handleError(Exception e, CommandMessage message) {
handleError(e, commandMapper.map(message));
}

private void handleError(Exception e, CommandRequest requestBO) {
Command firstCommand = requestBO.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, e);
finalizeAndSendResponse(requestBO);
}

@SuppressWarnings("PMD")
public void configPush(CommandRequest request) {
Map<String, String> envVars;
try {
envVars = setBrokerSpecificEnvVars(request.getServiceId());
} catch (Exception e) {
log.error("Error getting terraform variables", e);
Command firstCommand = request.getCommandBundles().get(0).getCommands().get(0);
setCommandError(firstCommand, e);
finalizeAndSendResponse(request);
handleError(e, request);
return;
}
List<Path> executionLogFilesToClean = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
Expand All @@ -31,6 +32,7 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp
private final Map<Class, MessageProcessor> messageProcessorsByClassType;
private final MessagingService messagingService;
private final EventPortalProperties eventPortalProperties;
@Getter
private PersistentMessageReceiver persistentMessageReceiver;

protected SolacePersistentMessageHandler(MessagingService messagingService,
Expand All @@ -48,7 +50,8 @@ protected SolacePersistentMessageHandler(MessagingService messagingService,
@Override
public void onMessage(InboundMessage inboundMessage) {
String mopMessageSubclass = "";

MessageProcessor processor = null;
Object message = null;
try {
mopMessageSubclass = inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER);
String messageAsString = inboundMessage.getPayloadAsString();
Expand All @@ -57,26 +60,29 @@ public void onMessage(InboundMessage inboundMessage) {
messageClass = Class.forName(mopMessageSubclass);
cachedJSONDecoders.put(mopMessageSubclass, messageClass);
}
MessageProcessor processor = messageProcessorsByClassType.get(messageClass);
processor = messageProcessorsByClassType.get(messageClass);
if (processor == null) {
throw new UnsupportedOperationException("Could not find message processor for message of class " + messageClass.getCanonicalName());
}
setupMDC(messageAsString, messageClass.getSimpleName());
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
processor.processMessage(processor.castToMessageClass(toMessage(messageAsString, messageClass)));
message = toMessage(messageAsString, messageClass);
processor.processMessage(processor.castToMessageClass(message));

} catch (Exception e) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass);
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass);
processor.onFailure(e, processor.castToMessageClass(message));
} else {
log.error("Unsupported message type and/or processor encountered. Skipping processing", e);
}

throw new IllegalArgumentException(e);
} finally {
persistentMessageReceiver.ack(inboundMessage);
}
}

public PersistentMessageReceiver getPersistentMessageReceiver() {
return this.persistentMessageReceiver;
}

private PersistentMessageReceiver buildPersistentMessageReceiver(Queue queue) {
return messagingService
.createPersistentMessageReceiverBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ public class CommandMessageProcessor implements MessageProcessor<CommandMessage>

private final CommandManager commandManager;

private final DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper;

public CommandMessageProcessor(CommandManager commandManager) {

public CommandMessageProcessor(CommandManager commandManager,
DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper) {
this.commandManager = commandManager;
this.dynamicResourceConfigurationHelper = dynamicResourceConfigurationHelper;
}

@Override
public void processMessage(CommandMessage message) {
dynamicResourceConfigurationHelper.loadSolaceBrokerResourceConfigurations(message.getResources());
commandManager.execute(message);
}

Expand All @@ -33,4 +36,9 @@ public Class supportedClass() {
public CommandMessage castToMessageClass(Object message) {
return (CommandMessage) message;
}

@Override
public void onFailure(Exception e, CommandMessage message) {
commandManager.handleError(e, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public interface MessageProcessor<T extends MOPMessage> {
Class supportedClass();

T castToMessageClass(Object message);

void onFailure(Exception e, T message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public Class supportedClass() {
public ScanCommandMessage castToMessageClass(Object message) {
return (ScanCommandMessage) message;
}

@Override
public void onFailure(Exception e, ScanCommandMessage message) {
log.debug("Requires implementation");
}
}
Loading

0 comments on commit 28c349e

Please sign in to comment.