Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otel metrics and traces #460

Merged
merged 21 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a4caca7
Move addMetricsIfPresent into the metrics builder as a first class me…
gregschohn Nov 16, 2023
c026588
WIP to play with OpenTelemetry metric instruments and tracer spans.
gregschohn Nov 17, 2023
f3c0077
Get gradle files and docker-compose in order to support otlp exports …
gregschohn Nov 27, 2023
7fb8e2e
WIP
gregschohn Nov 27, 2023
a8ae3d1
Restore the docker-compose single-node/multi-node split docker-compos…
gregschohn Nov 28, 2023
da9d36b
Add labels to each metric instrument so that multiple values can be p…
gregschohn Nov 28, 2023
06618ca
Move the MetricsClosure into its own class and stop stuffing the metr…
gregschohn Nov 28, 2023
aba1aab
WIP - Cleanup + get Jaeger to work by switching the endpoint. Also i…
gregschohn Nov 28, 2023
900bc6d
Start moving away from ThreadLocal and 'current contexts' and toward …
gregschohn Nov 29, 2023
3746a8e
Get span parenting to work.
gregschohn Nov 30, 2023
e0e7bf1
Merge branch 'main' into DoNotMerge_MoreMetrics
gregschohn Nov 30, 2023
4b43262
Attempt to fix a failing unit test.
gregschohn Nov 30, 2023
322e12f
Refactor. Couple name changes, class package changes, and moved IRep…
gregschohn Nov 30, 2023
723bf77
Bundle all of the offloader spans with the netty handler spans.
gregschohn Dec 1, 2023
15a1705
Improve the tracing story for the capture proxy.
gregschohn Dec 2, 2023
8a6f52a
Tracing change: Flatten the flush span and just record it as 'blocked'.
gregschohn Dec 2, 2023
c50e01d
Minor cleanup - stop setting the namespace or trying to change in a p…
gregschohn Dec 4, 2023
17c517d
Start instrumenting the replayer with more contexts so that traces an…
gregschohn Dec 4, 2023
6288844
Double down on using Context objects in lieu of String labels and fix…
gregschohn Dec 11, 2023
09e849c
Merge branch 'FixKafkaResume' into OtelMetricsAndTraces
gregschohn Dec 11, 2023
9cf2540
Merge branch 'main' into OtelMetricsAndTraces
gregschohn Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions TrafficCapture/captureKafkaOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ repositories {
}

dependencies {
api 'io.netty:netty-buffer:4.1.100.Final'
implementation platform("io.opentelemetry:opentelemetry-bom:1.32.0")

implementation project(':captureOffloader')
implementation project(':coreUtilities')
implementation 'org.projectlombok:lombok:1.18.26'
implementation 'com.google.protobuf:protobuf-java:3.22.2'
implementation 'org.apache.kafka:kafka-clients:3.6.0'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.9'
implementation 'org.slf4j:slf4j-api:2.0.7'
implementation group: 'com.google.protobuf', name:'protobuf-java', version:'3.22.2'
api group:'io.netty', name:'netty-buffer', version: '4.1.100.Final'
implementation group: 'io.opentelemetry', name:'opentelemetry-api'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as previously, do we need this when coreUtilities already has this dependency?

implementation group: 'org.projectlombok', name:'lombok', version:'1.18.26'
implementation group: 'org.apache.kafka', name:'kafka-clients', version:'3.6.0'
implementation group: 'org.slf4j', name:'slf4j-api', version:'2.0.7'
implementation group: 'software.amazon.msk', name:'aws-msk-iam-auth', version:'1.1.9'

testImplementation project(':captureProtobufs')
testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '4.6.1'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '4.6.1'
testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@
import org.opensearch.migrations.coreutils.MetricsAttributeKey;
import org.opensearch.migrations.coreutils.MetricsEvent;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.tracing.SimpleMeteringClosure;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.OrderedStreamLifecyleManager;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.KafkaRecordContext;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;


@Slf4j
public class KafkaCaptureFactory implements IConnectionCaptureFactory<RecordMetadata> {

private static final SimpleMeteringClosure METERING_CLOSURE = new SimpleMeteringClosure("KafkaCapture");


private static final MetricsLogger metricsLogger = new MetricsLogger("BacksideHandler");

private static final String DEFAULT_TOPIC_NAME_FOR_TRAFFIC = "logging-traffic-topic";
Expand All @@ -51,8 +61,10 @@
}

@Override
public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager(connectionId));
public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(IConnectionContext ctx,
String connectionId) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId,
new StreamManager(ctx, connectionId));
}

@AllArgsConstructor
Expand All @@ -65,12 +77,33 @@
}
}

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
IConnectionContext telemetryContext;
String connectionId;
Instant startTime;

public StreamManager(IConnectionContext ctx, String connectionId) {
this.telemetryContext = ctx;
METERING_CLOSURE.meterIncrementEvent(telemetryContext, "offloader_created");
METERING_CLOSURE.meterDeltaEvent(telemetryContext, "offloaders_active", 1);

this.connectionId = connectionId;
this.startTime = Instant.now();
}

@Override
public void close() throws IOException {
log.atInfo().setMessage(() -> "factory.close()").log();
METERING_CLOSURE.meterHistogramMillis(telemetryContext, "offloader_stream_lifetime",
Duration.between(startTime, Instant.now()));
METERING_CLOSURE.meterDeltaEvent(telemetryContext, "offloaders_active", -1);
METERING_CLOSURE.meterIncrementEvent(telemetryContext, "offloader_closed");
}

@Override
public CodedOutputStreamWrapper createStream() {
METERING_CLOSURE.meterIncrementEvent(telemetryContext, "stream_created");

ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
}
Expand All @@ -93,8 +126,14 @@
// Used to essentially wrap Future returned by Producer to CompletableFuture
var cf = new CompletableFuture<RecordMetadata>();
log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic);

var flushContext = new KafkaRecordContext(telemetryContext,
METERING_CLOSURE.makeSpanContinuation("flushRecord"),
topicNameForTraffic, recordId, kafkaRecord.value().length);
METERING_CLOSURE.meterIncrementEvent(telemetryContext, "stream_flush_called");

// Async request to Kafka cluster
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId));
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId, flushContext));
metricsLogger.atSuccess(MetricsEvent.RECORD_SENT_TO_KAFKA)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, connectionId)
.setAttribute(MetricsAttributeKey.TOPIC_NAME, topicNameForTraffic)
Expand All @@ -108,29 +147,42 @@
throw e;
}
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId) {
return (metadata, exception) -> {
if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
};
}
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId,
KafkaRecordContext flushContext) {
// Keep this out of the inner class because it is more unsafe to include it within
// the inner class since the inner class has context that shouldn't be used. This keeps
// that field out of scope.
return (metadata, exception) -> {
log.atInfo().setMessage(()->"kafka completed sending a record").log();
METERING_CLOSURE.meterHistogramMicros(flushContext,
exception==null ? "stream_flush_success_ms" : "stream_flush_failure_ms");
METERING_CLOSURE.meterIncrementEvent(flushContext,
exception==null ? "stream_flush_success" : "stream_flush_failure");
METERING_CLOSURE.meterIncrementEvent(flushContext,
exception==null ? "stream_flush_success_bytes" : "stream_flush_failure_bytes",
flushContext.getRecordSize());
flushContext.currentSpan.end();

if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);

Check warning on line 180 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java#L179-L180

Added lines #L179 - L180 were not covered by tests
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.opensearch.migrations.tracing.ISpanWithParentGenerator;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.tracing.IWithAttributes;
import org.opensearch.migrations.tracing.IWithStartTime;

import java.time.Instant;

@AllArgsConstructor

Check warning on line 15 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/tracing/KafkaRecordContext.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/tracing/KafkaRecordContext.java#L15

Added line #L15 was not covered by tests
public class KafkaRecordContext implements IWithAttributes<IConnectionContext>, IWithStartTime {
static final AttributeKey<String> TOPIC_ATTR = AttributeKey.stringKey("topic");
static final AttributeKey<String> RECORD_ID_ATTR = AttributeKey.stringKey("recordId");
static final AttributeKey<Long> RECORD_SIZE_ATTR = AttributeKey.longKey("recordSize");

@Getter
public final IConnectionContext enclosingScope;
@Getter

Check warning on line 23 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/tracing/KafkaRecordContext.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/tracing/KafkaRecordContext.java#L23

Added line #L23 was not covered by tests
public final Span currentSpan;
@Getter
public final Instant startTime;
@Getter
public final String topic;
@Getter
public final String recordId;
@Getter
public final int recordSize;

public KafkaRecordContext(IConnectionContext enclosingScope, ISpanWithParentGenerator incomingSpan,
String topic, String recordId, int recordSize) {
this.enclosingScope = enclosingScope;
this.topic = topic;
this.recordId = recordId;
this.recordSize = recordSize;
this.startTime = Instant.now();
currentSpan = incomingSpan.apply(this.getPopulatedAttributes(), enclosingScope.getCurrentSpan());
}

@Override
public AttributesBuilder fillAttributes(AttributesBuilder builder) {
return builder.put(TOPIC_ATTR, getTopic())
.put(RECORD_ID_ATTR, getRecordId())
.put(RECORD_SIZE_ATTR, getRecordSize());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader;

import io.netty.buffer.Unpooled;
import io.opentelemetry.api.GlobalOpenTelemetry;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
Expand All @@ -18,7 +19,9 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.migrations.tracing.EmptyContext;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -51,7 +54,7 @@ public void testLargeRequestIsWithinKafkaMessageSizeLimit() throws IOException,
MockProducer<String, byte[]> producer = new MockProducer<>(true, new StringSerializer(), new ByteArraySerializer());
KafkaCaptureFactory kafkaCaptureFactory =
new KafkaCaptureFactory(TEST_NODE_ID_STRING, producer, maxAllowableMessageSize);
IChannelConnectionCaptureSerializer serializer = kafkaCaptureFactory.createOffloader(connectionId);
IChannelConnectionCaptureSerializer serializer = kafkaCaptureFactory.createOffloader(createCtx(), connectionId);

StringBuilder sb = new StringBuilder();
for (int i = 0; i < 15000; i++) {
Expand All @@ -73,6 +76,11 @@ public void testLargeRequestIsWithinKafkaMessageSizeLimit() throws IOException,
producer.close();
}

private static ConnectionContext createCtx() {
return new ConnectionContext("test", "test",
x->GlobalOpenTelemetry.getTracer("test").spanBuilder("test").startSpan());
}

/**
* This size calculation is based off the KafkaProducer client request size validation check done when Producer
* records are sent. This validation appears to be consistent for several versions now, here is a reference to
Expand All @@ -97,7 +105,7 @@ private int calculateRecordSize(ProducerRecord<String, byte[]> record, String re
public void testLinearOffloadingIsSuccessful() throws IOException {
KafkaCaptureFactory kafkaCaptureFactory =
new KafkaCaptureFactory(TEST_NODE_ID_STRING, mockProducer, 1024*1024);
IChannelConnectionCaptureSerializer offloader = kafkaCaptureFactory.createOffloader(connectionId);
IChannelConnectionCaptureSerializer offloader = kafkaCaptureFactory.createOffloader(createCtx(), connectionId);

List<Callback> recordSentCallbacks = new ArrayList<>(3);
when(mockProducer.send(any(), any())).thenAnswer(invocation -> {
Expand Down
11 changes: 7 additions & 4 deletions TrafficCapture/captureOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ sourceSets {
}
}
dependencies {
implementation platform("io.opentelemetry:opentelemetry-bom:1.32.0")
api group: 'io.netty', name: 'netty-buffer', version: '4.1.100.Final'

implementation project(':captureProtobufs')
implementation "com.google.protobuf:protobuf-java:3.22.2"
implementation 'org.projectlombok:lombok:1.18.26'
implementation project(':coreUtilities')
implementation group: 'io.opentelemetry', name:'opentelemetry-api'
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.22.2'
implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.26'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'

testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0'
testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'

testFixturesImplementation "com.google.protobuf:protobuf-java:3.22.2"
testFixturesImplementation project(':captureProtobufs')

testFixturesImplementation project(':coreUtilities')
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.22.2'
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import lombok.AllArgsConstructor;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -51,6 +53,9 @@
@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<Void> {
String connectionId;
@Override
public void close() {}

Check warning on line 57 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java#L57

Added line #L57 was not covered by tests

@Override
public CodedOutputStreamAndByteBufferWrapper createStream() {
return new CodedOutputStreamAndByteBufferWrapper(bufferSize);
Expand Down Expand Up @@ -80,7 +85,7 @@
}

@Override
public IChannelConnectionCaptureSerializer createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer<Void>(nodeId, connectionId, new StreamManager(connectionId));
public IChannelConnectionCaptureSerializer createOffloader(IConnectionContext ctx, String connectionId) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager(connectionId));

Check warning on line 89 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java#L89

Added line #L89 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.opensearch.migrations.trafficcapture;

import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.IOException;

public interface IConnectionCaptureFactory<T> {
IChannelConnectionCaptureSerializer<T> createOffloader(String connectionId) throws IOException;
IChannelConnectionCaptureSerializer<T> createOffloader(IConnectionContext ctx, String connectionId) throws IOException;
}
Loading
Loading