Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

METRICS -- the kickoff PR #376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.opensearch.migrations.coreutils.MetricsAttributeKey;
import org.opensearch.migrations.coreutils.MetricsEvent;
Comment on lines +10 to +11
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THANK YOU - this will clear a number of lint findings (of reused strings that should have been a constant)

import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
Expand Down Expand Up @@ -93,18 +95,16 @@
log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic);
// Async request to Kafka cluster
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId));
metricsLogger.atSuccess()
.addKeyValue("channelId", connectionId)
.addKeyValue("topicName", topicNameForTraffic)
.addKeyValue("sizeInBytes", kafkaRecord.value().length)
.addKeyValue("diagnosticId", recordId)
.setMessage("Sent message to Kafka").log();
metricsLogger.atSuccess(MetricsEvent.RECORD_SENT_TO_KAFKA)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, connectionId)
.setAttribute(MetricsAttributeKey.TOPIC_NAME, topicNameForTraffic)
.setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, kafkaRecord.value().length)
.setAttribute(MetricsAttributeKey.REQUEST_ID, recordId).emit();
return cf;
} catch (Exception e) {
metricsLogger.atError(e)
.addKeyValue("channelId", connectionId)
.addKeyValue("topicName", topicNameForTraffic)
.setMessage("Sending message to Kafka failed.").log();
metricsLogger.atError(MetricsEvent.RECORD_FAILED_TO_KAFKA, e)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, connectionId)
.setAttribute(MetricsAttributeKey.TOPIC_NAME, topicNameForTraffic).emit();

Check warning on line 107 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java#L105-L107

Added lines #L105 - L107 were not covered by tests
throw e;
}
}
Expand Down
16 changes: 16 additions & 0 deletions TrafficCapture/coreUtilities/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ plugins {
// id 'checkstyle'
id 'org.owasp.dependencycheck' version '8.2.1'
id "io.freefair.lombok" version "8.0.1"

id 'java'
}

//spotbugs {
Expand All @@ -48,6 +50,20 @@ dependencies {
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0'

// Log4j
implementation(platform("org.apache.logging.log4j:log4j-bom:2.21.1"))
implementation("org.apache.logging.log4j:log4j-api")
implementation("org.apache.logging.log4j:log4j-core")
Comment on lines +56 to +57
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ummm - this is weird. Were we even using log4j before?
Thanks for adding these. I think that we probably need to Slf4J binding from line 52 too.

implementation("org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0")

// OpenTelemetry core
implementation("io.opentelemetry:opentelemetry-sdk:1.30.0")
implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.30.0")
implementation("io.opentelemetry:opentelemetry-semconv:1.30.1-alpha")

// OpenTelemetry log4j appender
implementation("io.opentelemetry.instrumentation:opentelemetry-log4j-appender-2.17:1.30.0-alpha")
}

tasks.named('test') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.migrations.coreutils;

// These enum values create a discrete set of potential attributes that
// are attached to metrics events, which allows comparison across different events (when appropriate)
// and building reliable visualizations and dashboards off of this data. If an attribute key is changed
// or removed, one should expect that downstream dependencies (events that are being logged & dashboards/visualizations)
// will need to be updated as well, and may no longer be compatible with previously collected data. Adding values
// that correspond to attributes for new event types is backwards compatible.
public enum MetricsAttributeKey {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you put a caveat in the documentation that if these are changed, something else downstream will also need to be updated? Unit tests will also be super helpful and make maintenance easier.

EVENT("event"),
CONNECTION_ID("connection_id"),
REQUEST_ID("request_id"),
CHANNEL_ID("channel_id"),
SIZE_IN_BYTES("size_in_bytes"),
TOPIC_NAME("topic_name"),
SCHEDULED_SEND_TIME("scheduled_send_time"),
DELAY_FROM_ORIGINAL_TO_SCHEDULED_START("delayFromOriginalToScheduledStartInMs"),

HTTP_METHOD("http.method"),
HTTP_ENDPOINT("http.endpoint"),
HTTP_STATUS("http.status"),

EXCEPTION_MESSAGE("exception.message"),
EXCEPTION_STACKTRACE("exception.stacktrace"),
EXCEPTION_TYPE( "exception.type");

private String keyName;

MetricsAttributeKey(String keyName) {
this.keyName = keyName;
}

public String getKeyName() {
return keyName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.opensearch.migrations.coreutils;

public enum MetricsEvent {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to consider putting these within each of the projects or applications so that we don't need every update change this file. Not a big deal for a monorepo, but it will be more painful if/when we pull projects apart.
For an example on how to do this, take a look at this StackOverflow post showing how to use interfaces and inheritance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this, because grouping by application makes a lot of sense. I won't have this in the about-to-be-pushed update, but I think it makes sense, and I'm looking into the link you included--thanks.

CONNECTION_OPENED,
CONNECTION_CLOSED,
BACKSIDE_HANDLER_CHANNEL_ACTIVE,
BACKSIDE_HANDLER_CHANNEL_CLOSED,
BACKSIDE_HANDLER_EXCEPTION,
RECORD_SENT_TO_KAFKA,
RECORD_FAILED_TO_KAFKA,
RECORD_RECEIVED_FROM_KAFKA,
RECORD_FAILED_FROM_KAFKA,
PARSED_TRAFFIC_STREAM_FROM_KAFKA,
PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED,
ACCUMULATED_FULL_CAPTURED_SOURCE_REQUEST,
ACCUMULATED_FULL_CAPTURED_SOURCE_RESPONSE,
CAPTURED_REQUEST_PARSED_TO_HTTP,
WRITING_REQUEST_COMPONENT_FAILED,
WROTE_REQUEST_COMPONENT,
RECEIVED_REQUEST_COMPONENT,
RECEIVED_FULL_HTTP_REQUEST,
RECEIVED_RESPONSE_COMPONENT,
RECEIVING_RESPONSE_COMPONENT_FAILED,
RECEIVED_FULL_HTTP_RESPONSE,
RECEIVING_FULL_HTTP_RESPONSE_FAILED,
SCHEDULED_REQUEST_TO_BE_SENT,
REQUEST_WAS_TRANSFORMED,
TRANSFORMING_REQUEST_FAILED,
REQUEST_REDRIVEN_WITHOUT_TRANSFORMATION
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.migrations.coreutils;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;
import org.slf4j.Logger;
import org.slf4j.spi.LoggingEventBuilder;


@Slf4j
public
class MetricsLogBuilder {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to do this, but it would be great to get a semantic model (like SonarQube or IntelliJ) to know that there should be a warning if emit has not been called on these built-up values. It might already do that by virtue of builders being returned and then discarding the last value. So - maybe it's a future action item, maybe not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agreed--it would be super nice to have.

private Logger logger;
private LoggingEventBuilder loggingEventBuilder;

public MetricsLogBuilder(Logger logger) {
this.logger = logger;
}

public MetricsLogBuilder setAttribute(MetricsAttributeKey key, Object value) {
loggingEventBuilder.addKeyValue(key.getKeyName(), value);
return this;
}

public MetricsLogBuilder atSuccess(MetricsEvent event) {
loggingEventBuilder = logger.makeLoggingEventBuilder(Level.INFO);
setAttribute(MetricsAttributeKey.EVENT, event.toString());
return this;
}

public MetricsLogBuilder atError(MetricsEvent event) {
loggingEventBuilder = logger.makeLoggingEventBuilder(Level.ERROR);
setAttribute(MetricsAttributeKey.EVENT, event.toString());
return this;
}

public MetricsLogBuilder atTrace(MetricsEvent event) {
loggingEventBuilder = logger.makeLoggingEventBuilder(Level.TRACE);
setAttribute(MetricsAttributeKey.EVENT, event.toString());
return this;
}

public void emit() {
loggingEventBuilder.log();
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package org.opensearch.migrations.coreutils;

import lombok.extern.slf4j.Slf4j;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.OpenTelemetryAppender;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
Comment on lines +3 to +10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a fast follow, this would be a good thing to divide into implementation and interface via ServiceLoader for those that are interested in doing metrics without OTEL.

import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;

@Slf4j
public
class MetricsLogger {
private Logger logger;

/**
* Creates a MetricLogger instance. Under the hood this creates a slf4j logger instance.
* Creates a MetricLogger instance.
*
* @param source Generally the name of the class where the logger is being instantiated, this
* will be combined with `MetricsLogger.` to form the logger name.
Expand All @@ -22,15 +28,40 @@ public MetricsLogger(String source) {
logger = LoggerFactory.getLogger(String.format("MetricsLogger.%s", source));
}

public static void initializeOpenTelemetry(String serviceName, String collectorEndpoint) {
OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of questions around corner cases and how does this break?
What's the retry policy?
What happens when the collector remains down?
How will the connection recover?

Those need to be documented somewhere, especially for the capture proxy.

.setLoggerProvider(
SdkLoggerProvider.builder()
.setResource(
Resource.getDefault().toBuilder()
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.build())
.addLogRecordProcessor(
BatchLogRecordProcessor.builder(
OtlpGrpcLogRecordExporter.builder()
.setEndpoint(collectorEndpoint)
.build())
.build())
.build())
.build();
GlobalOpenTelemetry.set(sdk);

// Add hook to close SDK, which flushes logs
Runtime.getRuntime().addShutdownHook(new Thread(sdk::close));
OpenTelemetryAppender.install(GlobalOpenTelemetry.get());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this block, will this block, until the appender is fully or partially operational (please document)? Now would be the time to fail, at startup, rather to fail silently and leave the user in a lurch later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be a missing word(s) in your comment -- are you asking if/when this block will return? Agreed that this needs more investigation & documentation.

}


/**
* To indicate a successful event (e.g. data received or data sent) that may be a helpful
* metric, this method can be used to return a LoggingEventBuilder. The LoggingEventBuilder
* object can be supplemented with key-value pairs (used for diagnostic information and
* dimensions for the metric) and a log message, and then logged. As an example,
* metricsLogger.atSuccess().addKeyValue("key", "value").setMessage("Task succeeded").log();
*/
public LoggingEventBuilder atSuccess() {
return logger.makeLoggingEventBuilder(Level.INFO);
public MetricsLogBuilder atSuccess(MetricsEvent event) {
return new MetricsLogBuilder(logger).atSuccess(event);
}

/**
Expand All @@ -39,19 +70,25 @@ public LoggingEventBuilder atSuccess() {
* key-value pairs) and a log message.
* @param cause The exception thrown in the failure, this will be set as the cause for the log message.
*/
public LoggingEventBuilder atError(Throwable cause) {
return logger.makeLoggingEventBuilder(Level.ERROR).setCause(cause);
public MetricsLogBuilder atError(MetricsEvent event, Throwable cause) {
if (cause == null) {
return atError(event);
}
return new MetricsLogBuilder(logger).atError(event)
.setAttribute(MetricsAttributeKey.EXCEPTION_MESSAGE, cause.getMessage())
.setAttribute(MetricsAttributeKey.EXCEPTION_STACKTRACE, cause.getStackTrace().toString())
.setAttribute(MetricsAttributeKey.EXCEPTION_TYPE, cause.getClass().getName());
}

/**
* This also indicates a failed event that may be a helpful metric. It can be used in cases where
* there is a failure that isn't indicated by an Exception being thrown.
*/
public LoggingEventBuilder atError() {
return logger.makeLoggingEventBuilder(Level.ERROR);
public MetricsLogBuilder atError(MetricsEvent event) {
return new MetricsLogBuilder(logger).atError(event);
}

public LoggingEventBuilder atTrace() {
return logger.makeLoggingEventBuilder(Level.TRACE);
public MetricsLogBuilder atTrace(MetricsEvent event) {
return new MetricsLogBuilder(logger).atTrace(event);
}
}
6 changes: 4 additions & 2 deletions TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ dependencies {

def dockerFilesForExternalServices = [
"elasticsearchWithSearchGuard": "elasticsearch_searchguard",
"migrationConsole": "migration_console"
"migrationConsole": "migration_console",
"opensearchDashboards": "opensearch_dashboards",
]
// Create the static docker files that aren't hosting migrations java code from this repo
dockerFilesForExternalServices.each { projectName, dockerImageName ->
Expand Down Expand Up @@ -66,7 +67,8 @@ dockerCompose {
task buildDockerImages {
dependsOn buildDockerImage_elasticsearchWithSearchGuard
dependsOn buildDockerImage_migrationConsole

dependsOn buildDockerImage_opensearchDashboards

dependsOn buildDockerImage_trafficCaptureProxyServer
dependsOn buildDockerImage_trafficReplayer
}
Expand Down
Loading
Loading