Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-78976: Quality | EMA metrics for number of ongoing jobs + health check metrics #199

Merged
merged 12 commits into from
Aug 30, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public ScanCommandMessage(String messagingServiceId,
this(messagingServiceId, scanId, scanTypes, destinations, null);
}



@Override
public String toLog() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.solace.maas.ep.common.metrics;

public class ObservabilityConstants {
public static final String MAAS_EMA_SCAN_EVENT_SENT = "maas.ema.scan_event.sent";
public static final String MAAS_EMA_CONFIG_PUSH_EVENT_SENT = "maas.ema.config_push_event.sent";
public static final String MAAS_EMA_HEARTBEAT_EVENT_SENT = "maas.ema.heartbeat_event.sent";

public static final String MAAS_EMA_SCAN_EVENT_RECEIVED = "maas.ema.scan_event.received";
public static final String MAAS_EMA_CONFIG_PUSH_EVENT_RECEIVED = "maas.ema.config_push_event.received";

public static final String MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME = "maas.ema.config_push_event.cycle_time";

public static final String STATUS_TAG = "status";
public static final String ORG_ID_TAG = "org_id";
public static final String SCAN_ID_TAG = "scan_id";

private ObservabilityConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.solace.maas.ep.event.management.agent.processor.CommandLogStreamingProcessor;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -24,14 +26,21 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_CONFIG_PUSH_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;
import static com.solace.maas.ep.event.management.agent.constants.Command.COMMAND_CORRELATION_ID;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.ACTOR_ID;
import static com.solace.maas.ep.event.management.agent.plugin.constants.RouteConstants.TRACE_ID;
Expand All @@ -49,18 +58,21 @@ public class CommandManager {
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor configPushPool;
private final Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt;
private final MeterRegistry meterRegistry;

public CommandManager(TerraformManager terraformManager,
CommandMapper commandMapper,
CommandPublisher commandPublisher,
MessagingServiceDelegateService messagingServiceDelegateService,
EventPortalProperties eventPortalProperties,
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt) {
Optional<CommandLogStreamingProcessor> commandLogStreamingProcessorOpt,
MeterRegistry meterRegistry) {
this.terraformManager = terraformManager;
this.commandMapper = commandMapper;
this.commandPublisher = commandPublisher;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.eventPortalProperties = eventPortalProperties;
this.meterRegistry = meterRegistry;
configPushPool = new ThreadPoolTaskExecutor();
configPushPool.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
configPushPool.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
Expand All @@ -73,6 +85,7 @@ public CommandManager(TerraformManager terraformManager,

public void execute(CommandMessage request) {
CommandRequest requestBO = commandMapper.map(request);
requestBO.setCreatedTime(Instant.now());
CompletableFuture.runAsync(() -> configPush(requestBO), configPushPool)
.exceptionally(e -> {
log.error("Error running command", e);
Expand Down Expand Up @@ -221,6 +234,14 @@ private void finalizeAndSendResponse(CommandRequest request) {
response.setTraceId(MDC.get(TRACE_ID));
response.setActorId(MDC.get(ACTOR_ID));
commandPublisher.sendCommandResponse(response, topicVars);
meterRegistry.counter(MAAS_EMA_CONFIG_PUSH_EVENT_SENT, ORG_ID_TAG, response.getOrgId(),
STATUS_TAG, response.getStatus().name()).increment();
Timer jobCycleTime = Timer
.builder(MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME)
.tag(ORG_ID_TAG, response.getOrgId())
.tag(STATUS_TAG, request.getStatus().name())
.register(meterRegistry);
jobCycleTime.record(request.getLifetime(ChronoUnit.MILLIS), TimeUnit.MILLISECONDS);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.jacoco.ExcludeFromJacocoGeneratedReport;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.info.BuildProperties;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_HEARTBEAT_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;

@ExcludeFromJacocoGeneratedReport
@Component
Expand All @@ -23,21 +31,33 @@ public class HeartbeatGenerator {
private final String runtimeAgentId;
private final String topic;
private final String runtimeAgentVersion;
private final MeterRegistry meterRegistry;

public HeartbeatGenerator(SolaceConfiguration solaceConfiguration,
EventPortalProperties eventPortalProperties,
SolacePublisher solacePublisher,
BuildProperties buildProperties) {
BuildProperties buildProperties,
MeterRegistry meterRegistry) {
this.solacePublisher = solacePublisher;
this.runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
topic = solaceConfiguration.getTopicPrefix() + "heartbeat/v1";
this.runtimeAgentVersion = getFormattedVersion(buildProperties.getVersion());
this.meterRegistry = meterRegistry;
}

@Scheduled(fixedRate = 5000)
public void sendHeartbeat() {
HeartbeatMessage message = new HeartbeatMessage(runtimeAgentId, Instant.now().toString(), runtimeAgentVersion);
solacePublisher.publish(message, topic);
boolean result = solacePublisher.publish(message, topic);
logHealthMetric(message, result);
}

private void logHealthMetric(HeartbeatMessage message, boolean isHealthy) {
List<Tag> tags = new ArrayList<>();
if (Objects.nonNull(message.getOrgId())) {
tags.add(Tag.of(ORG_ID_TAG, message.getOrgId()));
}
meterRegistry.gauge(MAAS_EMA_HEARTBEAT_EVENT_SENT, tags, isHealthy ? 1 : 0);
}

private String getFormattedVersion(String version) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package com.solace.maas.ep.event.management.agent.publisher;

import com.solace.maas.ep.event.management.agent.plugin.constants.ScanStatus;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;

@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanDataPublisher {

private final SolacePublisher solacePublisher;
private final MeterRegistry meterRegistry;

public ScanDataPublisher(SolacePublisher solacePublisher) {
public ScanDataPublisher(SolacePublisher solacePublisher,
MeterRegistry meterRegistry) {
this.solacePublisher = solacePublisher;
this.meterRegistry = meterRegistry;
}

/**
Expand Down Expand Up @@ -43,6 +53,11 @@ public void sendScanData(MOPMessage message, Map<String, String> topicDetails) {
topicDetails.get("scanId"),
topicDetails.get("scanType"));

solacePublisher.publish(message, topicString);
boolean isSuccessful = solacePublisher.publish(message, topicString);

meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT,
STATUS_TAG, isSuccessful ? ScanStatus.COMPLETE.name() : ScanStatus.FAILED.name(),
SCAN_ID_TAG, topicDetails.get("scanId"),
ORG_ID_TAG, topicDetails.get("orgId")).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanOverallStatusException;
import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanStatusException;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand All @@ -14,15 +15,23 @@
import java.util.List;
import java.util.Map;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;

@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanStatusPublisher {

private final SolacePublisher solacePublisher;
private final MeterRegistry meterRegistry;

public ScanStatusPublisher(SolacePublisher solacePublisher) {
public ScanStatusPublisher(SolacePublisher solacePublisher,
MeterRegistry meterRegistry) {
this.solacePublisher = solacePublisher;
this.meterRegistry = meterRegistry;
}

/**
Expand All @@ -46,6 +55,9 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map<String, String>
} catch (Exception e) {
throw new ScanOverallStatusException("Over all status exception: " + e.getMessage(),
Map.of(scanId, List.of(e)), "Overall status", Arrays.asList(scanType.split(",")), ScanStatus.valueOf(status));
} finally {
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status, SCAN_ID_TAG, scanId,
ORG_ID_TAG, topicDetails.get("orgId")).increment();
}
}

Expand All @@ -72,6 +84,9 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map<String, String
} catch (Exception e) {
throw new ScanStatusException("Route status exception: " + e.getMessage(),
Map.of(scanId, List.of(e)), "Route status", List.of(scanType), ScanStatus.valueOf(status));
} finally {
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status, SCAN_ID_TAG, scanId,
ORG_ID_TAG, topicDetails.get("orgId")).increment();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public String scan(ScanRequestBO scanRequestBO) {
brokerScanType, e.getKey()))
.filter(Objects::nonNull)
.filter(list -> !list.isEmpty())
.collect(Collectors.toList()).stream()
.toList().stream()
)
.collect(Collectors.toList()).stream().flatMap(List::stream).collect(Collectors.toList());
.toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanTypeBO;
import com.solace.maas.ep.event.management.agent.util.IDGenerator;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.camel.Exchange;
Expand All @@ -40,6 +41,10 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG;

/**
* Responsible for initiating and managing Messaging Service scans.
*/
Expand All @@ -62,11 +67,17 @@ public class ScanService {

private final IDGenerator idGenerator;

private final MeterRegistry meterRegistry;

public ScanService(ScanRepository repository,
ScanRecipientHierarchyRepository scanRecipientHierarchyRepository,
ScanTypeRepository scanTypeRepository, ScanStatusRepository scanStatusRepository, ScanRouteService scanRouteService,
RouteService routeService, ProducerTemplate producerTemplate,
IDGenerator idGenerator) {
ScanTypeRepository scanTypeRepository,
ScanStatusRepository scanStatusRepository,
ScanRouteService scanRouteService,
RouteService routeService,
ProducerTemplate producerTemplate,
IDGenerator idGenerator,
MeterRegistry meterRegistry) {
this.repository = repository;
Comment on lines +74 to 81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

this.scanRecipientHierarchyRepository = scanRecipientHierarchyRepository;
this.scanTypeRepository = scanTypeRepository;
Expand All @@ -75,6 +86,7 @@ public ScanService(ScanRepository repository,
this.routeService = routeService;
this.producerTemplate = producerTemplate;
this.idGenerator = idGenerator;
this.meterRegistry = meterRegistry;
}

/**
Expand Down Expand Up @@ -284,6 +296,7 @@ public void sendScanStatus(String groupId, String scanId, String traceId, String
exchange.getIn().setHeader(RouteConstants.SCAN_STATUS, status);
exchange.getIn().setHeader(RouteConstants.SCAN_STATUS_DESC, "");
});
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status.name(), SCAN_ID_TAG, scanId).increment();
}

protected CompletableFuture<Exchange> scanAsync(String groupId, String scanId, String traceId, String actorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public class CommandMessageHandler extends SolaceDirectMessageHandler<CommandMes

public CommandMessageHandler(
SolaceConfiguration solaceConfiguration,
SolaceSubscriber solaceSubscriber, CommandMessageProcessor commandMessageProcessor) {
SolaceSubscriber solaceSubscriber,
CommandMessageProcessor commandMessageProcessor) {
super(solaceConfiguration.getTopicPrefix() + "command/v1/>", solaceSubscriber);
this.commandMessageProcessor = commandMessageProcessor;
}
Expand Down
Loading
Loading