Skip to content

Commit

Permalink
Merge pull request #138 from SolaceProducts/moodiRealist/DATAGO-64298…
Browse files Browse the repository at this point in the history
…-send-configPush-logs-to-ep

DATAGO-64298: send config push logs to eVMR
  • Loading branch information
moodiRealist authored Dec 8, 2023
2 parents 3f96420 + d26ead3 commit 7d1879c
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.solace.maas.ep.common.messages;

import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPProtocol;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPUHFlag;
import lombok.Data;

@Data
public class CommandLogMessage extends MOPMessage {
String orgId;

String commandCorrelationId;

String level;

String log;

Long timestamp;

public CommandLogMessage(String orgId, String commandCorrelationId, String traceId, String actorId, String level, String log, Long timestamp) {
super();
withMessageType(MOPMessageType.generic)
.withProtocol(MOPProtocol.epConfigPush)
.withVersion("1")
.withUhFlag(MOPUHFlag.ignore);

this.orgId = orgId;
this.commandCorrelationId = commandCorrelationId;
this.level = level;
this.log = log;
this.timestamp = timestamp;
setTraceId(traceId);
setActorId(actorId);
}

@Override
public String toLog() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,51 @@ public class StreamingAppender extends AppenderBase<ILoggingEvent> {
protected void append(ILoggingEvent event) {
if (!standalone) {
if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.SCAN_ID))) {
sendLogsAsync(event,
sendScanLogsAsync(event,
event.getMDCPropertyMap().get(RouteConstants.SCAN_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
event.getMDCPropertyMap().get(RouteConstants.SCAN_TYPE),
event.getMDCPropertyMap().get(RouteConstants.SCHEDULE_ID),
event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID));
} else if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID))) {
log.trace("This is a placeholder for DATAGO-64298");
sendCommandLogsAsync(event,
event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID));
}
}
}

public void sendLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId,
String scanType, String groupId, String messagingServiceId) {
private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelationId, String traceId,
String actorId, String messagingServiceId) {


RouteEntity route = RouteEntity.builder()
.id("seda:commandLogsPublisher")
.active(true)
.build();

producerTemplate.asyncSend(route.getId(), exchange -> {
// Need to set headers to let the Route have access to the Correlation ID, Group ID, and Messaging Service ID.
exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, commandCorrelationId);
exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId);
exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId);
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

exchange.getIn().setBody(event);

MDC.clear();
}).whenComplete((exchange, exception) -> {
if (exception != null) {
log.error("Exception occurred while executing route publishLogs for commandCorrelation {}.", commandCorrelationId, exception);
}
});
}

private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId,
String scanType, String groupId, String messagingServiceId) {
RouteEntity route = creatLoggingRoute(scanType, messagingServiceId);

producerTemplate.asyncSend(route.getId(), exchange -> {
Expand All @@ -64,7 +94,7 @@ public void sendLogsAsync(ILoggingEvent event, String scanId, String traceId, St
});
}

protected RouteEntity creatLoggingRoute(String scanType, String messagingServiceId) {
private RouteEntity creatLoggingRoute(String scanType, String messagingServiceId) {
MessagingServiceRouteDelegate scanDelegate =
PluginLoader.findPlugin("SCAN_LOGS");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.solace.maas.ep.event.management.agent.processor;

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.solace.maas.ep.common.messages.CommandLogMessage;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants;
import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandLogsProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;

private final CommandLogsPublisher logDataPublisher;

@Autowired
public CommandLogsProcessor(CommandLogsPublisher logDataPublisher, EventPortalProperties eventPortalProperties) {
super();

this.logDataPublisher = logDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

@Override
public void process(Exchange exchange) throws Exception {
Map<String, String> topicDetails = new HashMap<>();

Map<String, Object> properties = exchange.getIn().getHeaders();
ILoggingEvent event = (ILoggingEvent) exchange.getIn().getBody();
String commandCorrelationId = (String) properties.get(RouteConstants.COMMAND_CORRELATION_ID);
String traceId = (String) properties.get(RouteConstants.TRACE_ID);
String actorId = (String) properties.get(RouteConstants.ACTOR_ID);
String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID);

CommandLogMessage logDataMessage = new CommandLogMessage(orgId, commandCorrelationId, traceId, actorId, event.getLevel().toString(),
String.format("%s%s", event.getFormattedMessage(), "\n"), event.getTimeStamp());

topicDetails.put("orgId", orgId);
topicDetails.put("runtimeAgentId", runtimeAgentId);
topicDetails.put("messagingServiceId", messagingServiceId);
topicDetails.put("commandCorrelationId", commandCorrelationId);

logDataPublisher.sendCommandLogData(logDataMessage, topicDetails);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.solace.maas.ep.event.management.agent.publisher;

import com.solace.maas.ep.common.messages.CommandLogMessage;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
@Slf4j
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandLogsPublisher {

private final SolacePublisher solacePublisher;
private final SolaceConfiguration solaceConfiguration;

public CommandLogsPublisher(SolacePublisher solacePublisher, SolaceConfiguration solaceConfiguration) {

this.solacePublisher = solacePublisher;
this.solaceConfiguration = solaceConfiguration;
}

public void sendCommandLogData(CommandLogMessage message, Map<String, String> topicDetails) {
String topicString = solaceConfiguration.getTopicPrefix() +
String.format("commandLogs/v1/%s", topicDetails.get("commandCorrelationId"));

solacePublisher.publish(message, topicString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.solace.maas.ep.event.management.agent.route.ep;

import com.solace.maas.ep.event.management.agent.processor.CommandLogsProcessor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandLogsPublisherRouteBuilder extends RouteBuilder {
private final CommandLogsProcessor commandLogsProcessor;

@Autowired
public CommandLogsPublisherRouteBuilder(CommandLogsProcessor processor) {
super();
commandLogsProcessor = processor;
}

@Override
public void configure() throws Exception {
from("seda:commandLogsPublisher?blockWhenFull=true&size=1000000")
.routeId("commandLogsPublisher")
.throttle(100)
.process(commandLogsProcessor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.solace.maas.ep.event.management.agent.plugin.route.handler;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.LoggingEvent;
import com.solace.maas.ep.common.messages.CommandLogMessage;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPProtocol;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import com.solace.maas.ep.event.management.agent.processor.CommandLogsProcessor;
import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher;
import com.solace.maas.ep.event.management.agent.route.ep.CommandLogsPublisherRouteBuilder;
import lombok.SneakyThrows;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWith;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ActiveProfiles;

import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@CamelSpringBootTest
@EnableAutoConfiguration
@SpringBootTest(
properties = {"camel.springboot.name=routeHandlerTest"}
)
@ActiveProfiles("TEST")
class CommandLogsPublisherRouteBuilderTests {

@Autowired
private ProducerTemplate producerTemplate;

@Autowired
private CamelContext camelContext;

@EndpointInject("mock:direct:result")
private MockEndpoint mockResult;


@Test
@SneakyThrows
void testMockRoute() throws Exception {
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
ILoggingEvent event = new LoggingEvent(null, logger, Level.DEBUG,
"test message", new Throwable("throwable message"), null);

Exchange exchange = new DefaultExchange(camelContext);

exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, "commandCorrelation1");
exchange.getIn().setHeader(RouteConstants.TRACE_ID, "traceId");
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, "messagingService");
exchange.getIn().setHeader(RouteConstants.TOPIC, "test/ep/v1");

exchange.getIn().setBody(event);

AdviceWith.adviceWith(camelContext, "commandLogsPublisher",
route -> {
route.replaceFromWith("direct:commandLogsPublisher");
route.weaveAddLast().to("mock:direct:result");
});

mockResult.expectedMessageCount(1);
producerTemplate.send("direct:commandLogsPublisher", exchange);
mockResult.assertIsSatisfied();
}

@Test
void testCommandLogMessageMOPProtocol() {
CommandLogMessage commandLogMessage = new CommandLogMessage(
"orgId",
"commandId",
"traceId",
"actorId",
"level",
"log", Instant.now().toEpochMilli());

assertThat(commandLogMessage.getMopProtocol()).isEqualTo(MOPProtocol.epConfigPush);
}


@Configuration
static class TestConfig {
@Bean
@Primary
public static RoutesBuilder createRouteBuilder() {
SolacePublisher solacePublisher = mock(SolacePublisher.class);
EventPortalProperties eventPortalProperties = mock(EventPortalProperties.class);
SolaceConfiguration solaceConfiguration = mock(SolaceConfiguration.class);

CommandLogsPublisher commandLogsPublisher = new CommandLogsPublisher(solacePublisher, solaceConfiguration);
CommandLogsProcessor commandLogsProcessor = new CommandLogsProcessor(commandLogsPublisher, eventPortalProperties);
return new CommandLogsPublisherRouteBuilder(commandLogsProcessor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.solace.maas.ep.event.management.agent.plugin.jacoco.ExcludeFromJacocoGeneratedReport;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -64,13 +65,21 @@ final class ProcessLauncher {
}

void setOutputListener(Consumer<String> listener) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
assert process == null;
outputListener = listener;
outputListener = (log) -> {
MDC.setContextMap(mdcContext);
listener.accept(log);
};
}

void setErrorListener(Consumer<String> listener) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
assert process == null;
errorListener = listener;
errorListener = (error) -> {
MDC.setContextMap(mdcContext);
listener.accept(error);
};
}

void setInheritIO(boolean inheritIO) {
Expand Down
Loading

0 comments on commit 7d1879c

Please sign in to comment.