Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otel metrics and traces #460

Merged
merged 21 commits into from
Feb 9, 2024
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a4caca7
Move addMetricsIfPresent into the metrics builder as a first class me…
gregschohn Nov 16, 2023
c026588
WIP to play with OpenTelemetry metric instruments and tracer spans.
gregschohn Nov 17, 2023
f3c0077
Get gradle files and docker-compose in order to support otlp exports …
gregschohn Nov 27, 2023
7fb8e2e
WIP
gregschohn Nov 27, 2023
a8ae3d1
Restore the docker-compose single-node/multi-node split docker-compos…
gregschohn Nov 28, 2023
da9d36b
Add labels to each metric instrument so that multiple values can be p…
gregschohn Nov 28, 2023
06618ca
Move the MetricsClosure into its own class and stop stuffing the metr…
gregschohn Nov 28, 2023
aba1aab
WIP - Cleanup + get Jaeger to work by switching the endpoint. Also i…
gregschohn Nov 28, 2023
900bc6d
Start moving away from ThreadLocal and 'current contexts' and toward …
gregschohn Nov 29, 2023
3746a8e
Get span parenting to work.
gregschohn Nov 30, 2023
e0e7bf1
Merge branch 'main' into DoNotMerge_MoreMetrics
gregschohn Nov 30, 2023
4b43262
Attempt to fix a failing unit test.
gregschohn Nov 30, 2023
322e12f
Refactor. Couple name changes, class package changes, and moved IRep…
gregschohn Nov 30, 2023
723bf77
Bundle all of the offloader spans with the netty handler spans.
gregschohn Dec 1, 2023
15a1705
Improve the tracing story for the capture proxy.
gregschohn Dec 2, 2023
8a6f52a
Tracing change: Flatten the flush span and just record it as 'blocked'.
gregschohn Dec 2, 2023
c50e01d
Minor cleanup - stop setting the namespace or trying to change in a p…
gregschohn Dec 4, 2023
17c517d
Start instrumenting the replayer with more contexts so that traces an…
gregschohn Dec 4, 2023
6288844
Double down on using Context objects in lieu of String labels and fix…
gregschohn Dec 11, 2023
09e849c
Merge branch 'FixKafkaResume' into OtelMetricsAndTraces
gregschohn Dec 11, 2023
9cf2540
Merge branch 'main' into OtelMetricsAndTraces
gregschohn Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP to play with OpenTelemetry metric instruments and tracer spans.
Most of this is just playing, but making the StreamManager implement AutoCloseable gives a place to end spans to show how long a serializer/connection factory was relevant for.

Signed-off-by: Greg Schohn <[email protected]>
gregschohn committed Nov 27, 2023
commit c026588ef5a9cb1bca3aaf302481bd41444a7041
14 changes: 9 additions & 5 deletions TrafficCapture/captureKafkaOffloader/build.gradle
Original file line number Diff line number Diff line change
@@ -12,11 +12,15 @@ dependencies {
api 'io.netty:netty-buffer:4.1.100.Final'
implementation project(':captureOffloader')
implementation project(':coreUtilities')
implementation 'org.projectlombok:lombok:1.18.26'
implementation 'com.google.protobuf:protobuf-java:3.22.2'
implementation 'org.apache.kafka:kafka-clients:3.6.0'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.9'
implementation 'org.slf4j:slf4j-api:2.0.7'

implementation group: 'com.google.protobuf', name:'protobuf-java', version:'3.22.2'
implementation group: 'io.opentelemetry', name:'opentelemetry-api', version: '1.30.0'
implementation group: 'org.projectlombok', name:'lombok', version:'1.18.26'
implementation group: 'org.apache.kafka', name:'kafka-clients', version:'3.6.0'
implementation group: 'software.amazon.msk', name:'aws-msk-iam-auth', version:'1.1.9'

implementation group: 'org.slf4j', name:'slf4j-api', version:'2.0.7'

testImplementation project(':captureProtobufs')
testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader;

import com.google.protobuf.CodedOutputStream;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -17,7 +23,10 @@
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer;
import org.opensearch.migrations.coreutils.MetricsLogger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

@@ -31,6 +40,7 @@ public class KafkaCaptureFactory implements IConnectionCaptureFactory<RecordMeta
// This value encapsulates overhead we should reserve for a given Producer record to account for record key bytes and
// general Kafka message overhead
public static final int KAFKA_MESSAGE_OVERHEAD_BYTES = 500;
public static final String TELEMETRY_SCOPE_NAME = "KafkaCaptureFactory";

private final String nodeId;
// Potential future optimization here to use a direct buffer (e.g. nio) instead of byte array
@@ -52,7 +62,16 @@ public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer, int

@Override
public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager(connectionId));
var tracer = GlobalOpenTelemetry.get().getTracer(TELEMETRY_SCOPE_NAME);
Span connectionSpan = tracer.spanBuilder("connection").startSpan();

try (var namedOnlyForAutoClose = Context.current().with(connectionSpan).makeCurrent()) {
var meter = GlobalOpenTelemetry.get().getMeter(TELEMETRY_SCOPE_NAME);
meter.counterBuilder("connection_created").build().add(1);
}

return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId,
new StreamManager(connectionSpan, connectionId));
}

@AllArgsConstructor
@@ -65,9 +84,28 @@ static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
}
}

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
Span telemetrySpan;
String connectionId;
Instant startTime;

public StreamManager(Span telemetrySpan, String connectionId) {
this.telemetrySpan = telemetrySpan;
this.connectionId = connectionId;
this.startTime = Instant.now();
}

@Override
public void close() throws IOException {
try (var namedOnlyForAutoClose = Context.current().with(telemetrySpan).makeCurrent()) {
var histogram = GlobalOpenTelemetry.get().getMeter(TELEMETRY_SCOPE_NAME)
.histogramBuilder("connection_lifetime").build();
telemetrySpan.setAttribute("connectionId", connectionId);
histogram.record((double) Duration.between(startTime, Instant.now()).toMillis(), Attributes.empty(),
Context.current().with(telemetrySpan));
telemetrySpan.end();
}
}

@Override
public CodedOutputStreamWrapper createStream() {
Original file line number Diff line number Diff line change
@@ -51,6 +51,9 @@ public FileConnectionCaptureFactory(String nodeId, String path, int bufferSize)
@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<Void> {
String connectionId;
@Override
public void close() {}

@Override
public CodedOutputStreamAndByteBufferWrapper createStream() {
return new CodedOutputStreamAndByteBufferWrapper(bufferSize);
Original file line number Diff line number Diff line change
@@ -188,20 +188,23 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO
if (streamHasBeenClosed || (currentCodedOutputStreamHolderOrNull == null && !isFinal)) {
return CompletableFuture.completedFuture(null);
}
CodedOutputStream currentStream = getOrCreateCodedOutputStream();
var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER;
// e.g. 3: 1
currentStream.writeInt32(fieldNum, ++numFlushesSoFar);
log.trace("Flushing the current CodedOutputStream for {}.{}", connectionIdString, numFlushesSoFar);
currentStream.flush();
assert currentStream == currentCodedOutputStreamHolderOrNull.getOutputStream() : "Expected the stream that " +
"is being finalized to be the same stream contained by currentCodedOutputStreamHolderOrNull";
var future = streamManager.closeStream(currentCodedOutputStreamHolderOrNull, numFlushesSoFar);
currentCodedOutputStreamHolderOrNull = null;
if (isFinal) {
streamHasBeenClosed = true;
try {
CodedOutputStream currentStream = getOrCreateCodedOutputStream();
var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER;
// e.g. 3: 1
currentStream.writeInt32(fieldNum, ++numFlushesSoFar);
log.trace("Flushing the current CodedOutputStream for {}.{}", connectionIdString, numFlushesSoFar);
currentStream.flush();
assert currentStream == currentCodedOutputStreamHolderOrNull.getOutputStream() : "Expected the stream that " +
"is being finalized to be the same stream contained by currentCodedOutputStreamHolderOrNull";
return streamManager.closeStream(currentCodedOutputStreamHolderOrNull, numFlushesSoFar);
} finally {
currentCodedOutputStreamHolderOrNull = null;
if (isFinal) {
streamHasBeenClosed = true;
streamManager.close();
}
}
return future;
}

@Override
@@ -222,7 +225,8 @@ public void addDisconnectEvent(Instant timestamp) throws IOException {
@Override
public void addCloseEvent(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.CLOSE_FIELD_NUMBER, 1);
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, CloseObservation.getDefaultInstance());
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER,
CloseObservation.getDefaultInstance());
}

@Override
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.opensearch.migrations.trafficcapture;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

public interface StreamLifecycleManager<T> {
public interface StreamLifecycleManager<T> extends AutoCloseable {
CodedOutputStreamHolder createStream();

CompletableFuture<T> closeStream(CodedOutputStreamHolder outputStreamHolder, int index);
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -320,6 +320,9 @@ class StreamManager extends OrderedStreamLifecyleManager {
int bufferSize;
ConcurrentLinkedQueue<ByteBuffer> outputBuffers;

@Override
public void close() {}

@Override
public CodedOutputStreamHolder createStream() {
return new CodedOutputStreamAndByteBufferWrapper(bufferSize);
Original file line number Diff line number Diff line change
@@ -36,6 +36,9 @@ public InMemoryConnectionCaptureFactory(String nodeId, int bufferSize, Runnable

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<Void> {
@Override
public void close() {}

@Override
public CodedOutputStreamHolder createStream() {
return new CodedOutputStreamAndByteBufferWrapper(bufferSize);
8 changes: 5 additions & 3 deletions TrafficCapture/coreUtilities/build.gradle
Original file line number Diff line number Diff line change
@@ -56,9 +56,11 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0")

// OpenTelemetry core
implementation("io.opentelemetry:opentelemetry-sdk:1.30.0")
implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.30.0")
implementation("io.opentelemetry:opentelemetry-semconv:1.30.1-alpha")
implementation group: 'io.opentelemetry', name:'opentelemetry-api', version: '1.30.0'
implementation group: 'io.opentelemetry', name:'opentelemetry-exporter-otlp', version: '1.30.0'
implementation group: 'io.opentelemetry', name:'opentelemetry-sdk', version: '1.30.0'
implementation group: 'io.opentelemetry.instrumentation', name:'opentelemetry-log4j-appender-2.17', version: '1.30.0-alpha'
implementation group: 'io.opentelemetry', name:'opentelemetry-semconv', version: '1.30.0-alpha'

// OpenTelemetry log4j appender
implementation("io.opentelemetry.instrumentation:opentelemetry-log4j-appender-2.17:1.30.0-alpha")
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
package org.opensearch.migrations.coreutils;


import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.OpenTelemetryAppender;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

@Slf4j
public
class MetricsLogger {
@@ -29,29 +42,84 @@ public MetricsLogger(String source) {
}

public static void initializeOpenTelemetry(String serviceName, String collectorEndpoint) {
OpenTelemetrySdk sdk =
var serviceResource = Resource.getDefault().toBuilder()
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.build();

OpenTelemetrySdk openTelemetrySdk =
OpenTelemetrySdk.builder()
.setLoggerProvider(
SdkLoggerProvider.builder()
.setResource(
Resource.getDefault().toBuilder()
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.build())
.setResource(serviceResource)
.addLogRecordProcessor(
BatchLogRecordProcessor.builder(
OtlpGrpcLogRecordExporter.builder()
.setEndpoint(collectorEndpoint)
.build())
.build())
.build())
.build();
GlobalOpenTelemetry.set(sdk);
.setTracerProvider(
SdkTracerProvider.builder()
.setResource(serviceResource)
.addSpanProcessor(
BatchSpanProcessor.builder(
OtlpGrpcSpanExporter.builder()
.setEndpoint(collectorEndpoint)
.setTimeout(2, TimeUnit.SECONDS)
.build())
.setScheduleDelay(100, TimeUnit.MILLISECONDS)
.build())
.build())
.setMeterProvider(
SdkMeterProvider.builder()
.setResource(serviceResource)
.registerMetricReader(
PeriodicMetricReader.builder(
OtlpGrpcMetricExporter.builder()
.setEndpoint(collectorEndpoint)
.build())
.setInterval(Duration.ofMillis(1000))
.build())
.build())
.buildAndRegisterGlobal();

// Add hook to close SDK, which flushes logs
Runtime.getRuntime().addShutdownHook(new Thread(sdk::close));
OpenTelemetryAppender.install(GlobalOpenTelemetry.get());
Runtime.getRuntime().addShutdownHook(new Thread(openTelemetrySdk::close));
//OpenTelemetryAppender.install(GlobalOpenTelemetry.get());
}

public static class SimpleMeteringClosure {
public final Meter meter;
public final Tracer tracer;
public SimpleMeteringClosure(String scopeName) {
meter = GlobalOpenTelemetry.getMeter(scopeName);
tracer = GlobalOpenTelemetry.getTracer(scopeName);
}
public void meterIncrementEvent(Context ctx, String eventName) {
meterIncrementEvent(ctx, eventName, 1);
}
public void meterIncrementEvent(Context ctx, String eventName, long increment) {
if (ctx == null) { return; }
try (var namedOnlyForAutoClose = ctx.makeCurrent()) {
meter.counterBuilder(eventName).build().add(increment);
}
}
public void meterDeltaEvent(Context ctx, String eventName, long delta) {
if (ctx == null) { return; }
try (var namedOnlyForAutoClose = ctx.makeCurrent()) {
meter.upDownCounterBuilder(eventName).build().add(delta);
}
}
public void meterHistogramMillis(Context ctx, String eventName, Duration between) {
meterHistogram(ctx, eventName, (double) between.toMillis());
}
public void meterHistogram(Context ctx, String eventName, double value) {
if (ctx == null) { return; }
try (var namedOnlyForAutoClose = ctx.makeCurrent()) {
meter.histogramBuilder(eventName).build().record(value);
}
}
}

/**
* To indicate a successful event (e.g. data received or data sent) that may be a helpful
@@ -61,7 +129,7 @@ public static void initializeOpenTelemetry(String serviceName, String collectorE
* metricsLogger.atSuccess().addKeyValue("key", "value").setMessage("Task succeeded").log();
*/
public MetricsLogBuilder atSuccess(MetricsEvent event) {
return new MetricsLogBuilder(logger).atSuccess(event);
return new MetricsLogBuilder().atSuccess(event);
}

/**
@@ -74,7 +142,7 @@ public MetricsLogBuilder atError(MetricsEvent event, Throwable cause) {
if (cause == null) {
return atError(event);
}
return new MetricsLogBuilder(logger).atError(event)
return new MetricsLogBuilder().atError(event)
.setAttribute(MetricsAttributeKey.EXCEPTION_MESSAGE, cause.getMessage())
.setAttribute(MetricsAttributeKey.EXCEPTION_TYPE, cause.getClass().getName());
}
@@ -84,10 +152,11 @@ public MetricsLogBuilder atError(MetricsEvent event, Throwable cause) {
* there is a failure that isn't indicated by an Exception being thrown.
*/
public MetricsLogBuilder atError(MetricsEvent event) {
return new MetricsLogBuilder(logger).atError(event);

return new MetricsLogBuilder().atError(event);
}

public MetricsLogBuilder atTrace(MetricsEvent event) {
return new MetricsLogBuilder(logger).atTrace(event);
return new MetricsLogBuilder().atTrace(event);
}
}
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ services:
ports:
- "9200:9200"
- "19200:19200"
volumes:
- /Users/schohn/dev/opensearch-migrations/TrafficCapture/containerLogs:/logs
environment:
- http.port=19200
# Run processes for elasticsearch and capture proxy, and exit if either one ends
@@ -70,6 +72,7 @@ services:
- migrations
volumes:
- sharedReplayerOutput:/shared-replayer-output
- /Users/schohn/dev/opensearch-migrations/TrafficCapture/containerLogs:/logs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference to local needs removal

environment:
- TUPLE_DIR_PATH=/shared-replayer-output/traffic-replayer-default
depends_on:
@@ -119,10 +122,12 @@ services:
- "13133:13133"
volumes:
- ./otelcol/otel-config.yml:/etc/otel-config.yml
- /Users/schohn/dev/opensearch-migrations/TrafficCapture/containerLogs:/logs
networks:
- migrations
depends_on:
- opensearchanalytics
command: tail -f /dev/null

migration-console:
image: 'migrations/migration_console:latest'
Loading