Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replayer instrumentation #475

Merged
merged 100 commits into from
Feb 9, 2024
Merged
Changes from 1 commit
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
a4caca7
Move addMetricsIfPresent into the metrics builder as a first class me…
gregschohn Nov 16, 2023
c026588
WIP to play with OpenTelemetry metric instruments and tracer spans.
gregschohn Nov 17, 2023
f3c0077
Get gradle files and docker-compose in order to support otlp exports …
gregschohn Nov 27, 2023
7fb8e2e
WIP
gregschohn Nov 27, 2023
a8ae3d1
Restore the docker-compose single-node/multi-node split docker-compos…
gregschohn Nov 28, 2023
da9d36b
Add labels to each metric instrument so that multiple values can be p…
gregschohn Nov 28, 2023
06618ca
Move the MetricsClosure into its own class and stop stuffing the metr…
gregschohn Nov 28, 2023
aba1aab
WIP - Cleanup + get Jaeger to work by switching the endpoint. Also i…
gregschohn Nov 28, 2023
900bc6d
Start moving away from ThreadLocal and 'current contexts' and toward …
gregschohn Nov 29, 2023
3746a8e
Get span parenting to work.
gregschohn Nov 30, 2023
e0e7bf1
Merge branch 'main' into DoNotMerge_MoreMetrics
gregschohn Nov 30, 2023
4b43262
Attempt to fix a failing unit test.
gregschohn Nov 30, 2023
322e12f
Refactor. Couple name changes, class package changes, and moved IRep…
gregschohn Nov 30, 2023
723bf77
Bundle all of the offloader spans with the netty handler spans.
gregschohn Dec 1, 2023
15a1705
Improve the tracing story for the capture proxy.
gregschohn Dec 2, 2023
8a6f52a
Tracing change: Flatten the flush span and just record it as 'blocked'.
gregschohn Dec 2, 2023
c50e01d
Minor cleanup - stop setting the namespace or trying to change in a p…
gregschohn Dec 4, 2023
17c517d
Start instrumenting the replayer with more contexts so that traces an…
gregschohn Dec 4, 2023
6288844
Double down on using Context objects in lieu of String labels and fix…
gregschohn Dec 11, 2023
09e849c
Merge branch 'FixKafkaResume' into OtelMetricsAndTraces
gregschohn Dec 11, 2023
9cf2540
Merge branch 'main' into OtelMetricsAndTraces
gregschohn Dec 12, 2023
c14da6a
Update the Http Logging Handler to suppress response packet captures …
gregschohn Dec 12, 2023
7de4009
File rename since the LoggingHttpRequest handler now handles both req…
gregschohn Dec 12, 2023
a5bfc7d
Shuffling lots of details of Contexts and the relationships between d…
gregschohn Dec 15, 2023
3d60106
Lots of refactoring to get a couple more test cases to pass.
gregschohn Dec 15, 2023
ad6bd13
Test fixes
gregschohn Dec 15, 2023
07ae016
Begin to cleanup endspans for some of the contexts. Lots of bugs rem…
gregschohn Dec 16, 2023
ccb517c
More work to get context chains to work better together.
gregschohn Dec 17, 2023
8bda2e3
Two critical bugfixes around handling close observations that were di…
gregschohn Dec 17, 2023
d9df3fa
Fix some test code where the nodeId and connectionId got reversed, ca…
gregschohn Dec 18, 2023
ab3dfb4
Extra guards to try to make tests more reliable, but one of the FullT…
gregschohn Dec 19, 2023
273c5aa
More test fixes, including fixing a regression that I had caused in a…
gregschohn Dec 19, 2023
e0167f5
Fix a race condition with commitKafkaKey.
gregschohn Dec 19, 2023
3d81ad8
Two changes to kafka interactions. Add trace spans for traffic sourc…
gregschohn Dec 20, 2023
ae45a6a
Extract an IInstrumentationAttributes interface from IScopedInstrumen…
gregschohn Dec 20, 2023
195d0ba
Checkpoint/WIP - More spans across the board, specifically through th…
gregschohn Dec 21, 2023
bef9944
Test bugfix. toString() wasn't threadsafe. Now it is.
gregschohn Dec 22, 2023
8b50c89
Refactoring and code consolidation around context management.
gregschohn Dec 22, 2023
0e5fe09
Refactoring. Which classes emit metrics, all scopes have names, and …
gregschohn Dec 22, 2023
613a504
Refactor TestContext so that it doesn't use statics and allows caller…
gregschohn Dec 23, 2023
7df0dcc
Remove a hardcoded path to my local directory
gregschohn Dec 23, 2023
de0d482
Fixed bugs in trace management and forced a lot more test code to tak…
gregschohn Dec 23, 2023
72b1ca8
Add another scheduled span before the request is sent.
gregschohn Dec 23, 2023
6043eed
Move all span names into virtual interface functions so that they can…
gregschohn Dec 26, 2023
37b99eb
Pass more contexts, make contexts able to express more metrics, and e…
gregschohn Jan 2, 2024
6684486
Minor bugfixes that make a huge difference. Fix a broken unit test a…
gregschohn Jan 3, 2024
7bf4388
Some refactoring to increase the typesafety and to support greater co…
gregschohn Jan 3, 2024
8ef0376
Fix mend security issue for json-path CVE by updating opensearch-secu…
gregschohn Jan 4, 2024
37ae548
Remove zipkin as a tracing sink.
gregschohn Jan 4, 2024
1172912
Make attribute name filtering more generic and fix a bug in negation …
gregschohn Jan 5, 2024
22296b7
Minor tweaks to the otel collector (including renaming from 'demo') a…
gregschohn Jan 8, 2024
d1b237a
Set the aggregation temporality to delta rather than cumulative.
gregschohn Jan 8, 2024
fdd8141
Wrap all metric emissions within the context's span so that the metri…
gregschohn Jan 8, 2024
490521d
In progress changes. I'm trying to track down a regression and want …
gregschohn Jan 9, 2024
f199e98
In-progress checkpoint (code won't compile). Setting up separate met…
gregschohn Jan 10, 2024
e110540
Another in-progress checkpoint (still won't compile) where I'm moving…
gregschohn Jan 10, 2024
156ae72
Another checkpoint that still doesn't compile, but less files (I thin…
gregschohn Jan 11, 2024
5dc32d9
Stop passing the Root telemetry scope as a generic parameter to all o…
gregschohn Jan 11, 2024
320e9d8
Working on updating proxy code to get everything to compile.
gregschohn Jan 12, 2024
9601a68
More refactoring, still doesn't all compile, but most of it does (top…
gregschohn Jan 12, 2024
5f6bb3f
Fix the last of the compilation errors though tests are failing still.
gregschohn Jan 13, 2024
ccc0e2a
Bugfixes and test fixes to get all of the unit tests to pass.
gregschohn Jan 14, 2024
0744424
Bugfixes. Stop metering double events in a couple spots and fix a co…
gregschohn Jan 15, 2024
82f8ebb
Merge branch 'main' into ReplayerInstrumentation
gregschohn Jan 15, 2024
35a9185
Upgrade otel libraries to 1.34.1 from 1.32 and add the enable_open_me…
gregschohn Jan 16, 2024
0e8379d
Fix a bug where the current scope's attributes weren't being added in…
gregschohn Jan 16, 2024
a076bc3
Add a TestContext for every replayer test via inheritance on the Test…
gregschohn Jan 18, 2024
d3ee4f1
Change how MetricInstruments classes are instantiated.
gregschohn Jan 19, 2024
54c5e27
Build fix - When refactoring to use TestContexts more globally, a tes…
gregschohn Jan 19, 2024
0ce6694
Spin up a grafana container in the docker solution with simple creden…
gregschohn Jan 19, 2024
c8674eb
Start to get Source/Target comparison metrics in place and more refac…
gregschohn Jan 21, 2024
ffcc09b
Minor cleanup on exception tracking
gregschohn Jan 22, 2024
dd89336
Bugfix - a class was inheriting from the Connection context's MetricI…
gregschohn Jan 22, 2024
a15d08a
Cleanup build.gradle files' open-telemetry dependencies. Embrace ote…
gregschohn Jan 22, 2024
5c454ca
Partial checkin to delete dead code and clean up imports and style is…
gregschohn Jan 22, 2024
bf8ea86
Addressing PR Feedback with some localized cleanups
gregschohn Jan 23, 2024
b641c5a
aws cli wasn't functional within my arm64 container because the docke…
gregschohn Jan 24, 2024
cd34b32
Rework otel-collector container packaging.
gregschohn Jan 26, 2024
a4173c0
PR feedback including:
gregschohn Jan 27, 2024
2072f69
Change path from otelcol to otelCollector and enable the collector an…
gregschohn Jan 27, 2024
be689b5
Split the implementations of fillAttributes into two.
gregschohn Jan 28, 2024
607ff05
Fix the dependencies for logging leaves and add 'processors' and 'rec…
gregschohn Jan 28, 2024
d480ed8
Setting the docker command for the otel-collector service to use the …
gregschohn Jan 28, 2024
59db42f
Set the permissions for the otel container to write to cloudwatch and…
gregschohn Jan 28, 2024
76c8c31
Minor cleanup
gregschohn Jan 31, 2024
2cc67fd
Fix the runTestBenchmarks script to work when the endpoint uses http …
gregschohn Feb 1, 2024
6e70d1a
Merge branch 'main' into ReplayerInstrumentation
gregschohn Feb 1, 2024
9425ab6
Aesthetic formatting changes
gregschohn Feb 3, 2024
deb19a4
IInstrumentationAttributes no longer has scope related functionality.…
gregschohn Feb 4, 2024
a5f947e
Merge branch 'main' into ReplayerInstrumentation
gregschohn Feb 5, 2024
9a31210
README documentation for the Instrumentation + some cleanup.
gregschohn Feb 6, 2024
805e13b
When the first bucket size is <=0 for the CommonScopedMetricInstrumen…
gregschohn Feb 6, 2024
9aa3432
Add tracing and metrics for replayer sockets as they're created and c…
gregschohn Feb 7, 2024
48a45ab
Bugfix, test fix, lint fix.
gregschohn Feb 7, 2024
16a9010
Fix an edge case where a socketChannel might not have been created ev…
gregschohn Feb 7, 2024
4b102f7
Handle SocketContexts as first class contexts rather than trying to i…
gregschohn Feb 8, 2024
1cb8927
Fix an issue with when to close the SocketContext and some memory lea…
gregschohn Feb 8, 2024
441ca40
Tie off more loose ends for memory leaks during test runs.
gregschohn Feb 8, 2024
dc76239
Test fixes + make scheduled contexts use System.nanotime instead of I…
gregschohn Feb 9, 2024
8a875b3
Set the x-ray exporter attribute index_all_attributes=true so that at…
gregschohn Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
Signed-off-by: Greg Schohn <[email protected]>
gregschohn committed Nov 30, 2023
commit 7fb8e2e9a2a2e5062fbf0c67ac48cef95b02f302
2 changes: 1 addition & 1 deletion TrafficCapture/captureKafkaOffloader/build.gradle
Original file line number Diff line number Diff line change
@@ -9,12 +9,12 @@ repositories {
}

dependencies {
api 'io.netty:netty-buffer:4.1.100.Final'
implementation platform("io.opentelemetry:opentelemetry-bom:1.32.0")

implementation project(':captureOffloader')
implementation project(':coreUtilities')
implementation group: 'com.google.protobuf', name:'protobuf-java', version:'3.22.2'
api group:'io.netty', name:'netty-buffer', version: '4.1.100.Final'
implementation group: 'io.opentelemetry', name:'opentelemetry-api', version: '1.30.0'
implementation group: 'org.projectlombok', name:'lombok', version:'1.18.26'
implementation group: 'org.apache.kafka', name:'kafka-clients', version:'3.6.0'
Original file line number Diff line number Diff line change
@@ -2,11 +2,9 @@

import com.google.protobuf.CodedOutputStream;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.ContextKey;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -28,19 +26,26 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;


@Slf4j
public class KafkaCaptureFactory implements IConnectionCaptureFactory<RecordMetadata> {

private static final ContextKey<String> RECORD_ID_KEY = ContextKey.named("recordId");
private static final ContextKey<String> TOPIC_KEY = ContextKey.named("topic");
private static final ContextKey<Integer> RECORD_SIZE_KEY = ContextKey.named("recordSize");
public static final String TELEMETRY_SCOPE_NAME = "KafkaCapture";
public static final Optional<MetricsLogger.SimpleMeteringClosure> METERING_CLOSURE_OP =
Optional.of(new MetricsLogger.SimpleMeteringClosure(TELEMETRY_SCOPE_NAME));

private static final MetricsLogger metricsLogger = new MetricsLogger("BacksideHandler");

private static final String DEFAULT_TOPIC_NAME_FOR_TRAFFIC = "logging-traffic-topic";
// This value encapsulates overhead we should reserve for a given Producer record to account for record key bytes and
// general Kafka message overhead
public static final int KAFKA_MESSAGE_OVERHEAD_BYTES = 500;
public static final String TELEMETRY_SCOPE_NAME = "KafkaCaptureFactory";

private final String nodeId;
// Potential future optimization here to use a direct buffer (e.g. nio) instead of byte array
@@ -62,55 +67,62 @@ public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer, int

@Override
public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(String connectionId) {
var tracer = GlobalOpenTelemetry.get().getTracer(TELEMETRY_SCOPE_NAME);
Span connectionSpan = tracer.spanBuilder("connection").startSpan();

try (var namedOnlyForAutoClose = Context.current().with(connectionSpan).makeCurrent()) {
var meter = GlobalOpenTelemetry.get().getMeter(TELEMETRY_SCOPE_NAME);
meter.counterBuilder("connection_created").build().add(1);
}

var context = METERING_CLOSURE_OP.map(m->{
Span offloaderSpan = GlobalOpenTelemetry.get().getTracer(TELEMETRY_SCOPE_NAME)
.spanBuilder("offloader").startSpan();
offloaderSpan.setAttribute("offloaderConnectionId", connectionId);
var c = Context.current().with(offloaderSpan);
m.meterIncrementEvent(c, "offloader_created");
m.meterDeltaEvent(c, "offloaders_active", 1);
return c;
}).orElse(null);
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId,
new StreamManager(connectionSpan, connectionId));
new StreamManager(context, connectionId));
}

@AllArgsConstructor
static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
private final CodedOutputStream codedOutputStream;
private final ByteBuffer byteBuffer;
final Context streamContext;
@Override
public @NonNull CodedOutputStream getOutputStream() {
return codedOutputStream;
}
}

class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
Span telemetrySpan;
Context telemetryContext;
String connectionId;
Instant startTime;

public StreamManager(Span telemetrySpan, String connectionId) {
this.telemetrySpan = telemetrySpan;
public StreamManager(Context incomingTelemetryContext, String connectionId) {
this.telemetryContext = incomingTelemetryContext;
this.connectionId = connectionId;
this.startTime = Instant.now();
}

@Override
public void close() throws IOException {
try (var namedOnlyForAutoClose = Context.current().with(telemetrySpan).makeCurrent()) {
var histogram = GlobalOpenTelemetry.get().getMeter(TELEMETRY_SCOPE_NAME)
.histogramBuilder("connection_lifetime").build();
telemetrySpan.setAttribute("connectionId", connectionId);
histogram.record((double) Duration.between(startTime, Instant.now()).toMillis(), Attributes.empty(),
Context.current().with(telemetrySpan));
telemetrySpan.end();
}
METERING_CLOSURE_OP.ifPresent(m->{
m.meterHistogramMillis(telemetryContext, "connection_lifetime",
Duration.between(startTime, Instant.now()));
m.meterDeltaEvent(telemetryContext, "offloaders_active", -1);
m.meterIncrementEvent(telemetryContext, "offloader_closed");
});
Span.fromContext(telemetryContext).end();
}

@Override
public CodedOutputStreamWrapper createStream() {
var newStreamCtx = METERING_CLOSURE_OP.map(m-> {
m.meterIncrementEvent(telemetryContext, "stream_created");
try (var scope = telemetryContext.makeCurrent()) {
return Context.current().with(m.tracer.spanBuilder("recordStream").startSpan());
}
}).orElse(null);
ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb, newStreamCtx);
}

@Override
@@ -131,8 +143,20 @@ public CodedOutputStreamWrapper createStream() {
// Used to essentially wrap Future returned by Producer to CompletableFuture
var cf = new CompletableFuture<RecordMetadata>();
log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic);

var flushContext = METERING_CLOSURE_OP.map(m-> {
Span.fromContext(osh.streamContext).end();
try (var scope = telemetryContext
.with(RECORD_ID_KEY, recordId)
.with(TOPIC_KEY, topicNameForTraffic)
.with(RECORD_SIZE_KEY, kafkaRecord.value().length).makeCurrent()) {
m.meterIncrementEvent(telemetryContext, "stream_flush_called");
return Context.current().with(m.tracer.spanBuilder("flushRecord").startSpan());
}
}).orElse(null);

// Async request to Kafka cluster
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId));
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId, flushContext));
metricsLogger.atSuccess(MetricsEvent.RECORD_SENT_TO_KAFKA)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, connectionId)
.setAttribute(MetricsAttributeKey.TOPIC_NAME, topicNameForTraffic)
@@ -157,8 +181,18 @@ public CodedOutputStreamWrapper createStream() {
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId) {
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId,
Context flushContext) {
return (metadata, exception) -> {
METERING_CLOSURE_OP.ifPresent(m-> {
m.meterIncrementEvent(telemetryContext,
exception==null ? "stream_flush_success" : "stream_flush_failure");
m.meterIncrementEvent(telemetryContext,
exception==null ? "stream_flush_success_bytes" : "stream_flush_failure_bytes",
flushContext.get(RECORD_SIZE_KEY));
Span.fromContext(flushContext).end();
});

if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
@@ -170,5 +204,4 @@ private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf,
};
}
}

}
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ public void meterHistogram(Context ctx, String eventName, double value) {
* metricsLogger.atSuccess().addKeyValue("key", "value").setMessage("Task succeeded").log();
*/
public MetricsLogBuilder atSuccess(MetricsEvent event) {
return new MetricsLogBuilder().atSuccess(event);
return new MetricsLogBuilder(logger).atSuccess(event);
}

/**
@@ -142,7 +142,7 @@ public MetricsLogBuilder atError(MetricsEvent event, Throwable cause) {
if (cause == null) {
return atError(event);
}
return new MetricsLogBuilder().atError(event)
return new MetricsLogBuilder(logger).atError(event)
.setAttribute(MetricsAttributeKey.EXCEPTION_MESSAGE, cause.getMessage())
.setAttribute(MetricsAttributeKey.EXCEPTION_TYPE, cause.getClass().getName());
}
@@ -153,10 +153,10 @@ public MetricsLogBuilder atError(MetricsEvent event, Throwable cause) {
*/
public MetricsLogBuilder atError(MetricsEvent event) {

return new MetricsLogBuilder().atError(event);
return new MetricsLogBuilder(logger).atError(event);
}

public MetricsLogBuilder atTrace(MetricsEvent event) {
return new MetricsLogBuilder().atTrace(event);
return new MetricsLogBuilder(logger).atTrace(event);
}
}
52 changes: 26 additions & 26 deletions TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -54,40 +54,40 @@ services:
- zipkin-all-in-one

# Run combined instance of Capture Proxy and Elasticsearch
capture-proxy-es:
# capture-proxy-es:
# image: 'migrations/capture_proxy:latest'
# networks:
# - migrations
# ports:
# - "9200:9200"
# - "19200:19200"
# volumes:
# - /Users/schohn/dev/opensearch-migrations/TrafficCapture/containerLogs:/logs
# environment:
# - http.port=19200
# # Run processes for elasticsearch and capture proxy, and exit if either one ends
# command: /bin/sh -c '/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --kafkaConnection kafka:9092 --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml --otelCollectorEndpoint http://otel-collector:4317 & wait -n 1'
# depends_on:
# - kafka

# Run separate instances of Capture Proxy and Elasticsearch
capture-proxy:
image: 'migrations/capture_proxy:latest'
networks:
- migrations
ports:
- "9200:9200"
- "19200:19200"
volumes:
- /Users/schohn/dev/opensearch-migrations/TrafficCapture/containerLogs:/logs
environment:
- http.port=19200
# Run processes for elasticsearch and capture proxy, and exit if either one ends
command: /bin/sh -c '/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --kafkaConnection kafka:9092 --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml --otelCollectorEndpoint http://otel-collector:4317 & wait -n 1'
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --kafkaConnection kafka:9092 --destinationUri https://elasticsearch:9200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml --otelCollectorEndpoint http://otel-collector:4317"
depends_on:
- kafka
- elasticsearch

# Run separate instances of Capture Proxy and Elasticsearch
# capture-proxy:
# image: 'migrations/capture_proxy:latest'
# networks:
# - migrations
# ports:
# - "9200:9200"
# command: /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy --kafkaConnection kafka:9092 --destinationUri https://elasticsearch:9200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml
# depends_on:
# - kafka
# - elasticsearch
#
# elasticsearch:
# image: 'migrations/elasticsearch_searchguard:latest'
# networks:
# - migrations
# ports:
# - '19200:9200'
elasticsearch:
image: 'migrations/elasticsearch_searchguard:latest'
networks:
- migrations
ports:
- '19200:9200'

zookeeper:
image: docker.io/bitnami/zookeeper:3.8
4 changes: 3 additions & 1 deletion TrafficCapture/nettyWireLogging/build.gradle
Original file line number Diff line number Diff line change
@@ -13,7 +13,9 @@ dependencies {

implementation project(':captureOffloader')
implementation project(':coreUtilities')
api group: 'io.netty', name: 'netty-all', version: '4.1.100.Final'
api group: 'io.netty', name: 'netty-buffer'
api group: 'io.netty', name: 'netty-codec-http'
api group: 'io.netty', name: 'netty-handler'

implementation group: 'io.opentelemetry', name:'opentelemetry-api'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.ReferenceCountUtil;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
@@ -13,22 +15,33 @@
public class ConditionallyReliableLoggingHttpRequestHandler<T> extends LoggingHttpRequestHandler<T> {
private final Predicate<HttpRequest> shouldBlockPredicate;

public ConditionallyReliableLoggingHttpRequestHandler(IChannelConnectionCaptureSerializer<T> trafficOffloader,
public ConditionallyReliableLoggingHttpRequestHandler(Context incomingContext,
IChannelConnectionCaptureSerializer<T> trafficOffloader,
Predicate<HttpRequest> headerPredicateForWhenToBlock) {
super(trafficOffloader);
super(incomingContext, trafficOffloader);
this.shouldBlockPredicate = headerPredicateForWhenToBlock;
}

@Override
protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest)
throws Exception {
if (shouldBlockPredicate.test(httpRequest)) {
var blockingSpan = METERING_CLOSURE_OP.map(m->{
m.meterIncrementEvent(telemetryContext, "blockingRequestUntilFlush");
try (var namedOnlyForAutoClose = telemetryContext.makeCurrent()) {
return GlobalOpenTelemetry.get().getTracer(TELEMETRY_SCOPE_NAME)
.spanBuilder("blockedOnFlush").startSpan();
}}).orElse(null);
trafficOffloader.flushCommitAndResetStream(false).whenComplete((result, t) -> {
METERING_CLOSURE_OP.ifPresent(m->{
blockingSpan.end();
m.meterIncrementEvent(telemetryContext, t != null ? "blockedFlushFailure" : "blockedFlushSuccess");
});
if (t != null) {
// This is a spot where we would benefit from having a behavioral policy that different users
// could set as needed. Some users may be fine with just logging a failed offloading of a request
// where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276
log.warn("Got error: " + t.getMessage());
log.warn("Dropping request - Got error: " + t.getMessage());
ReferenceCountUtil.release(msg);
} else {
try {
@@ -39,6 +52,9 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob
}
});
} else {
METERING_CLOSURE_OP.ifPresent(m->{
m.meterIncrementEvent(telemetryContext, "nonBlockingRequest");
});
super.channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest);
}
}
Loading