Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-64298: send config push logs to eVMR #138

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.solace.maas.ep.common.messages;

import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPProtocol;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPUHFlag;
import lombok.Data;

@Data
public class CommandLogMessage extends MOPMessage {
String orgId;

String commandCorrelationId;

String level;

String log;

Long timestamp;

public CommandLogMessage(String orgId, String commandCorrelationId, String traceId, String actorId, String level, String log, Long timestamp) {
super();
withMessageType(MOPMessageType.generic)
.withProtocol(MOPProtocol.epConfigPush)
.withVersion("1")
.withUhFlag(MOPUHFlag.ignore);

this.orgId = orgId;
this.commandCorrelationId = commandCorrelationId;
this.level = level;
this.log = log;
this.timestamp = timestamp;
setTraceId(traceId);
setActorId(actorId);
}

@Override
public String toLog() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,51 @@ public class StreamingAppender extends AppenderBase<ILoggingEvent> {
protected void append(ILoggingEvent event) {
if (!standalone) {
if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.SCAN_ID))) {
sendLogsAsync(event,
sendScanLogsAsync(event,
event.getMDCPropertyMap().get(RouteConstants.SCAN_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
event.getMDCPropertyMap().get(RouteConstants.SCAN_TYPE),
event.getMDCPropertyMap().get(RouteConstants.SCHEDULE_ID),
moodiRealist marked this conversation as resolved.
Show resolved Hide resolved
event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID));
} else if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID))) {
log.trace("This is a placeholder for DATAGO-64298");
sendCommandLogsAsync(event,
event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID));
}
}
}

public void sendLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId,
String scanType, String groupId, String messagingServiceId) {
private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelationId, String traceId,
String actorId, String messagingServiceId) {


RouteEntity route = RouteEntity.builder()
.id("seda:commandLogsPublisher")
.active(true)
.build();

producerTemplate.asyncSend(route.getId(), exchange -> {
// Need to set headers to let the Route have access to the Correlation ID, Group ID, and Messaging Service ID.
exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, commandCorrelationId);
exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId);
exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId);
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

exchange.getIn().setBody(event);

MDC.clear();
}).whenComplete((exchange, exception) -> {
if (exception != null) {
log.error("Exception occurred while executing route publishLogs for commandCorrelation {}.", commandCorrelationId, exception);
}
});
}

private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId,
String scanType, String groupId, String messagingServiceId) {
RouteEntity route = creatLoggingRoute(scanType, messagingServiceId);

producerTemplate.asyncSend(route.getId(), exchange -> {
Expand All @@ -64,7 +94,7 @@ public void sendLogsAsync(ILoggingEvent event, String scanId, String traceId, St
});
}

protected RouteEntity creatLoggingRoute(String scanType, String messagingServiceId) {
private RouteEntity creatLoggingRoute(String scanType, String messagingServiceId) {
MessagingServiceRouteDelegate scanDelegate =
PluginLoader.findPlugin("SCAN_LOGS");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.solace.maas.ep.event.management.agent.processor;

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.solace.maas.ep.common.messages.CommandLogMessage;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants;
import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandLogsProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;

private final CommandLogsPublisher logDataPublisher;

@Autowired
public CommandLogsProcessor(CommandLogsPublisher logDataPublisher, EventPortalProperties eventPortalProperties) {
super();

this.logDataPublisher = logDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

@Override
public void process(Exchange exchange) throws Exception {
Map<String, String> topicDetails = new HashMap<>();

Map<String, Object> properties = exchange.getIn().getHeaders();
ILoggingEvent event = (ILoggingEvent) exchange.getIn().getBody();
String commandCorrelationId = (String) properties.get(RouteConstants.COMMAND_CORRELATION_ID);
String traceId = (String) properties.get(RouteConstants.TRACE_ID);
String actorId = (String) properties.get(RouteConstants.ACTOR_ID);
String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID);

CommandLogMessage logDataMessage = new CommandLogMessage(orgId, commandCorrelationId, traceId, actorId, event.getLevel().toString(),
String.format("%s%s", event.getFormattedMessage(), "\n"), event.getTimeStamp());

topicDetails.put("orgId", orgId);
topicDetails.put("runtimeAgentId", runtimeAgentId);
topicDetails.put("messagingServiceId", messagingServiceId);
topicDetails.put("commandCorrelationId", commandCorrelationId);

logDataPublisher.sendCommandLogData(logDataMessage, topicDetails);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.solace.maas.ep.event.management.agent.publisher;

import com.solace.maas.ep.common.messages.CommandLogMessage;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
@Slf4j
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandLogsPublisher {

private final SolacePublisher solacePublisher;
private final SolaceConfiguration solaceConfiguration;

public CommandLogsPublisher(SolacePublisher solacePublisher, SolaceConfiguration solaceConfiguration) {

this.solacePublisher = solacePublisher;
this.solaceConfiguration = solaceConfiguration;
}

public void sendCommandLogData(CommandLogMessage message, Map<String, String> topicDetails) {
String topicString = solaceConfiguration.getTopicPrefix() +
String.format("commandLogs/v1/%s", topicDetails.get("commandCorrelationId"));

solacePublisher.publish(message, topicString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.solace.maas.ep.event.management.agent.route.ep;

import com.solace.maas.ep.event.management.agent.processor.CommandLogsProcessor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandLogsPublisherRouteBuilder extends RouteBuilder {
private final CommandLogsProcessor commandLogsProcessor;

@Autowired
public CommandLogsPublisherRouteBuilder(CommandLogsProcessor processor) {
super();
commandLogsProcessor = processor;
}

@Override
public void configure() throws Exception {
from("seda:commandLogsPublisher?blockWhenFull=true&size=1000000")
.routeId("commandLogsPublisher")
.throttle(100)
.process(commandLogsProcessor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.solace.maas.ep.event.management.agent.plugin.route.handler;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.LoggingEvent;
import com.solace.maas.ep.common.messages.CommandLogMessage;
import com.solace.maas.ep.event.management.agent.config.SolaceConfiguration;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPProtocol;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import com.solace.maas.ep.event.management.agent.processor.CommandLogsProcessor;
import com.solace.maas.ep.event.management.agent.publisher.CommandLogsPublisher;
import com.solace.maas.ep.event.management.agent.route.ep.CommandLogsPublisherRouteBuilder;
import lombok.SneakyThrows;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWith;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ActiveProfiles;

import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@CamelSpringBootTest
@EnableAutoConfiguration
@SpringBootTest(
properties = {"camel.springboot.name=routeHandlerTest"}
)
@ActiveProfiles("TEST")
class CommandLogsPublisherRouteBuilderTests {

@Autowired
private ProducerTemplate producerTemplate;

@Autowired
private CamelContext camelContext;

@EndpointInject("mock:direct:result")
private MockEndpoint mockResult;


@Test
@SneakyThrows
void testMockRoute() throws Exception {
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
ILoggingEvent event = new LoggingEvent(null, logger, Level.DEBUG,
"test message", new Throwable("throwable message"), null);

Exchange exchange = new DefaultExchange(camelContext);

exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, "commandCorrelation1");
exchange.getIn().setHeader(RouteConstants.TRACE_ID, "traceId");
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, "messagingService");
exchange.getIn().setHeader(RouteConstants.TOPIC, "test/ep/v1");

exchange.getIn().setBody(event);

AdviceWith.adviceWith(camelContext, "commandLogsPublisher",
route -> {
route.replaceFromWith("direct:commandLogsPublisher");
route.weaveAddLast().to("mock:direct:result");
});

mockResult.expectedMessageCount(1);
producerTemplate.send("direct:commandLogsPublisher", exchange);
mockResult.assertIsSatisfied();
}

@Test
void testCommandLogMessageMOPProtocol() {
CommandLogMessage commandLogMessage = new CommandLogMessage(
"orgId",
"commandId",
"traceId",
"actorId",
"level",
"log", Instant.now().toEpochMilli());

assertThat(commandLogMessage.getMopProtocol()).isEqualTo(MOPProtocol.epConfigPush);
}


@Configuration
static class TestConfig {
@Bean
@Primary
public static RoutesBuilder createRouteBuilder() {
SolacePublisher solacePublisher = mock(SolacePublisher.class);
EventPortalProperties eventPortalProperties = mock(EventPortalProperties.class);
SolaceConfiguration solaceConfiguration = mock(SolaceConfiguration.class);

CommandLogsPublisher commandLogsPublisher = new CommandLogsPublisher(solacePublisher, solaceConfiguration);
CommandLogsProcessor commandLogsProcessor = new CommandLogsProcessor(commandLogsPublisher, eventPortalProperties);
return new CommandLogsPublisherRouteBuilder(commandLogsProcessor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.solace.maas.ep.event.management.agent.plugin.jacoco.ExcludeFromJacocoGeneratedReport;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -64,13 +65,21 @@ final class ProcessLauncher {
}

void setOutputListener(Consumer<String> listener) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
assert process == null;
outputListener = listener;
outputListener = (log) -> {
MDC.setContextMap(mdcContext);
listener.accept(log);
};
}

void setErrorListener(Consumer<String> listener) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
assert process == null;
errorListener = listener;
errorListener = (error) -> {
MDC.setContextMap(mdcContext);
listener.accept(error);
};
}

void setInheritIO(boolean inheritIO) {
Expand Down
Loading
Loading