From 2ab2d3a84a33722c7f692501de4b929a0d7ccec2 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Thu, 4 Apr 2024 16:51:26 -0600 Subject: [PATCH 1/5] Add info about interacting with the docker solution Signed-off-by: Mikayla Thompson --- TrafficCapture/dockerSolution/README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/TrafficCapture/dockerSolution/README.md b/TrafficCapture/dockerSolution/README.md index dba70daa2..c50e14a5d 100644 --- a/TrafficCapture/dockerSolution/README.md +++ b/TrafficCapture/dockerSolution/README.md @@ -14,6 +14,31 @@ While in the TrafficCapture directory, run the following command: `./gradlew :dockerSolution:composeUp` +### Interacting with the Docker Solution + +A sample source cluster (running Elasticsearch 7.10) is set up with the Capture Proxy running in front of it. To route +calls through the Capture Proxy, send requests to `localhost:9200` with the default credentials (admin, admin), e.g.: +```sh +curl --insecure https://localhost:9200/_cat/indices -u 'admin:admin' +``` + +You can send the same calls to the source cluster while bypassing the Capture Proxy (calls will not be relayed to the +target cluster) via `localhost:19200`, and to the target cluster directly at `localhost:29200`. + +For sample data that exercises various endpoints with a range of datatypes, you can ssh into the Migration Console +(`docker exec -i -t $(docker ps -aqf "ancestor=migrations/migration_console:latest") bash` or via the Docker console) +and run `./runTestBenchmarks.sh`. By default, this runs four workloads from +[Opensearch-Benchmark](https://github.com/opensearch-project/opensearch-benchmark) against the Capture Proxy endpoint +(9200). The Migration Console contains other utility functions (`./catIndices.sh`, `kafka-tools`, etc.) to interact +with the various containers of the solution. + +You can also access the metrics generated by the solution in Grafana. While the solution is running, go to +[http://localhost:3000](http://localhost:3000/) and enter the default credentials (admin, admin) and do ????? to see +metrics for the migration tools. (TODO: are we actually sending metrics in? I don't see anything in here) + +Traces for the capture proxy and replayer are available via Jaeger at [http://localhost:16686](http://localhost:16686). + + ### Running with different telemetry flavors. By default, composeUp will run an otel-collector that exports instrumentation to other local containers within the From 7552945fba2728fc5c2fbfe5b51a1f181494150f Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Thu, 11 Apr 2024 04:04:20 -0600 Subject: [PATCH 2/5] Add grafana datasource config Signed-off-by: Mikayla Thompson --- TrafficCapture/dockerSolution/README.md | 7 +++--- .../src/main/docker/docker-compose.yml | 1 + .../src/main/docker/grafana_datasources.yaml | 22 +++++++++++++++++++ 3 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/grafana_datasources.yaml diff --git a/TrafficCapture/dockerSolution/README.md b/TrafficCapture/dockerSolution/README.md index c50e14a5d..b35d7047b 100644 --- a/TrafficCapture/dockerSolution/README.md +++ b/TrafficCapture/dockerSolution/README.md @@ -25,7 +25,7 @@ curl --insecure https://localhost:9200/_cat/indices -u 'admin:admin' You can send the same calls to the source cluster while bypassing the Capture Proxy (calls will not be relayed to the target cluster) via `localhost:19200`, and to the target cluster directly at `localhost:29200`. -For sample data that exercises various endpoints with a range of datatypes, you can ssh into the Migration Console +For sample data that exercises various endpoints with a range of datatypes, you can `ssh` into the Migration Console (`docker exec -i -t $(docker ps -aqf "ancestor=migrations/migration_console:latest") bash` or via the Docker console) and run `./runTestBenchmarks.sh`. By default, this runs four workloads from [Opensearch-Benchmark](https://github.com/opensearch-project/opensearch-benchmark) against the Capture Proxy endpoint @@ -33,8 +33,9 @@ and run `./runTestBenchmarks.sh`. By default, this runs four workloads from with the various containers of the solution. You can also access the metrics generated by the solution in Grafana. While the solution is running, go to -[http://localhost:3000](http://localhost:3000/) and enter the default credentials (admin, admin) and do ????? to see -metrics for the migration tools. (TODO: are we actually sending metrics in? I don't see anything in here) +[http://localhost:3000](http://localhost:3000/) and enter the default credentials (admin, admin). Connections to +Jaeger and Prometheus are automatically provisioned (see them under `Connections->Data sources`), so you can go +directly to `Explore` and define a query using the supplied data from either data source. Traces for the capture proxy and replayer are available via Jaeger at [http://localhost:16686](http://localhost:16686). diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml index 90c180717..7f22e56c5 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml +++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml @@ -35,6 +35,7 @@ services: - "3000:3000" volumes: - ./grafana_data:/var/lib/grafana + - ./grafana_datasources.yaml:/usr/share/grafana/conf/provisioning/datasources/datasources.yaml environment: - GF_SECURITY_ADMIN_PASSWORD=admin depends_on: diff --git a/TrafficCapture/dockerSolution/src/main/docker/grafana_datasources.yaml b/TrafficCapture/dockerSolution/src/main/docker/grafana_datasources.yaml new file mode 100644 index 000000000..f466234e3 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/grafana_datasources.yaml @@ -0,0 +1,22 @@ +apiVersion: 1 +datasources: + - name: Jaeger + type: jaeger + url: http://jaeger:16686 + basicAuth: false + isDefault: false + readOnly: false + jsonData: {} + - name: Prometheus + type: prometheus + url: http://prometheus:9090 + access: proxy + basicAuth: false + isDefault: true + jsonData: + httpMethod: POST + prometheusType: Prometheus + exemplarTraceIdDestinations: + - name: trace_id + urlDisplayLabel: Jaeger + readOnly: false From 8f783fe1d4119305ec0769255c6595783b968881 Mon Sep 17 00:00:00 2001 From: "mend-for-github.aaakk.us.kg[bot]" <50673670+mend-for-github.aaakk.us.kg[bot]@users.noreply.github.com> Date: Thu, 11 Apr 2024 12:45:46 +0000 Subject: [PATCH 3/5] Update dependency io.netty:netty-codec-http to v4.1.108.Final --- TrafficCapture/nettyWireLogging/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/TrafficCapture/nettyWireLogging/build.gradle b/TrafficCapture/nettyWireLogging/build.gradle index 850a6c56d..862359df8 100644 --- a/TrafficCapture/nettyWireLogging/build.gradle +++ b/TrafficCapture/nettyWireLogging/build.gradle @@ -13,7 +13,7 @@ dependencies { implementation project(':captureOffloader') implementation project(':coreUtilities') api group: 'io.netty', name: 'netty-buffer', version: '4.1.100.Final' - api group: 'io.netty', name: 'netty-codec-http', version: '4.1.100.Final' + api group: 'io.netty', name: 'netty-codec-http', version: '4.1.108.Final' api group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final' implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' From ffd50665e4045c4ceb43a22719ea0d0f2634c2d8 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Thu, 11 Apr 2024 14:13:37 -0600 Subject: [PATCH 4/5] Update docs + move to custom grafana image Signed-off-by: Mikayla Thompson --- TrafficCapture/dockerSolution/README.md | 19 ++++++++++--------- TrafficCapture/dockerSolution/build.gradle | 4 +++- .../src/main/docker/docker-compose.yml | 3 +-- .../src/main/docker/grafana/Dockerfile | 3 +++ .../datasources.yaml} | 0 5 files changed, 17 insertions(+), 12 deletions(-) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/grafana/Dockerfile rename TrafficCapture/dockerSolution/src/main/docker/{grafana_datasources.yaml => grafana/datasources.yaml} (100%) diff --git a/TrafficCapture/dockerSolution/README.md b/TrafficCapture/dockerSolution/README.md index b35d7047b..a9fb1bf9f 100644 --- a/TrafficCapture/dockerSolution/README.md +++ b/TrafficCapture/dockerSolution/README.md @@ -25,19 +25,20 @@ curl --insecure https://localhost:9200/_cat/indices -u 'admin:admin' You can send the same calls to the source cluster while bypassing the Capture Proxy (calls will not be relayed to the target cluster) via `localhost:19200`, and to the target cluster directly at `localhost:29200`. -For sample data that exercises various endpoints with a range of datatypes, you can `ssh` into the Migration Console -(`docker exec -i -t $(docker ps -aqf "ancestor=migrations/migration_console:latest") bash` or via the Docker console) -and run `./runTestBenchmarks.sh`. By default, this runs four workloads from +For sample data that exercises various endpoints with a range of datatypes, you can execute a shell in the Migration +Console (`docker exec -i -t $(docker ps -aqf "ancestor=migrations/migration_console:latest") bash` or via the Docker +console) and run `./runTestBenchmarks.sh`. By default, this runs four short test workloads from [Opensearch-Benchmark](https://github.com/opensearch-project/opensearch-benchmark) against the Capture Proxy endpoint (9200). The Migration Console contains other utility functions (`./catIndices.sh`, `kafka-tools`, etc.) to interact with the various containers of the solution. -You can also access the metrics generated by the solution in Grafana. While the solution is running, go to -[http://localhost:3000](http://localhost:3000/) and enter the default credentials (admin, admin). Connections to -Jaeger and Prometheus are automatically provisioned (see them under `Connections->Data sources`), so you can go -directly to `Explore` and define a query using the supplied data from either data source. - -Traces for the capture proxy and replayer are available via Jaeger at [http://localhost:16686](http://localhost:16686). +With the default docker-compose configuration launched with :dockerSolution:composeUp, instrumentation containers +will be started (see below for other options). You can access the metrics generated by the solution in Grafana. +While the solution is running, go to [http://localhost:3000](http://localhost:3000/) and enter the default credentials +(admin, admin). Connections to Jaeger and Prometheus are automatically provisioned (see them under +`Connections->Data sources`), so you can go directly to `Explore` and define a query using the supplied data +from either data source. Traces for the capture proxy and replayer are available via Jaeger at +[http://localhost:16686](http://localhost:16686). ### Running with different telemetry flavors. diff --git a/TrafficCapture/dockerSolution/build.gradle b/TrafficCapture/dockerSolution/build.gradle index dbd5a07c5..a056deada 100644 --- a/TrafficCapture/dockerSolution/build.gradle +++ b/TrafficCapture/dockerSolution/build.gradle @@ -19,7 +19,8 @@ dependencies { def dockerFilesForExternalServices = [ "elasticsearchWithSearchGuard": "elasticsearch_searchguard", "migrationConsole": "migration_console", - "otelCollector": "otel_collector" + "otelCollector": "otel_collector", + "grafana": "grafana" ] // Create the static docker files that aren't hosting migrations java code from this repo dockerFilesForExternalServices.each { projectName, dockerImageName -> @@ -74,6 +75,7 @@ task buildDockerImages { dependsOn buildDockerImage_elasticsearchWithSearchGuard dependsOn buildDockerImage_migrationConsole dependsOn buildDockerImage_otelCollector + dependsOn buildDockerImage_grafana dependsOn buildDockerImage_trafficCaptureProxyServer dependsOn buildDockerImage_trafficReplayer diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml index 7f22e56c5..69ed9072d 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml +++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml @@ -28,14 +28,13 @@ services: - COLLECTOR_OTLP_ENABLED=true grafana: - image: grafana/grafana:latest + image: 'migrations/grafana:latest' networks: - migrations ports: - "3000:3000" volumes: - ./grafana_data:/var/lib/grafana - - ./grafana_datasources.yaml:/usr/share/grafana/conf/provisioning/datasources/datasources.yaml environment: - GF_SECURITY_ADMIN_PASSWORD=admin depends_on: diff --git a/TrafficCapture/dockerSolution/src/main/docker/grafana/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/grafana/Dockerfile new file mode 100644 index 000000000..57b951496 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/grafana/Dockerfile @@ -0,0 +1,3 @@ +FROM grafana/grafana:latest + +COPY datasources.yaml /usr/share/grafana/conf/provisioning/datasources/ diff --git a/TrafficCapture/dockerSolution/src/main/docker/grafana_datasources.yaml b/TrafficCapture/dockerSolution/src/main/docker/grafana/datasources.yaml similarity index 100% rename from TrafficCapture/dockerSolution/src/main/docker/grafana_datasources.yaml rename to TrafficCapture/dockerSolution/src/main/docker/grafana/datasources.yaml From 2fa83c1a4618cf5b1504962bfd1ad74983f98a6e Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Thu, 11 Apr 2024 12:32:24 -0400 Subject: [PATCH 5/5] Assorted bugfixes A memory leak for errantly formed http messages going through HttpByteBufFormatter; log4j2 config bug that stopped old replayer logs from compressing. I've also further improved the logging on shutdown. Signed-off-by: Greg Schohn --- .../replay/HttpByteBufFormatter.java | 6 +++- .../migrations/replay/TrafficReplayer.java | 19 ++++++++--- .../traffic/source/TrafficStreamLimiter.java | 33 +++++++++++++------ .../src/main/resources/log4j2.properties | 2 +- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index a7d8065c2..a258d033b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -171,7 +171,11 @@ public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stre } }); - return channel.readInbound(); + try { + return channel.readInbound(); + } finally { + channel.finishAndReleaseAll(); + } } public static FullHttpRequest parseHttpRequestFromBufs(Stream byteBufStream, boolean releaseByteBufs) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 3b6b94031..7a628f2db 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -288,6 +288,7 @@ public static void main(String[] args) throws Exception { RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"), contextTrackers); + ActiveContextMonitor activeContextMonitor = null; try (var blockingTrafficSource = TrafficCaptureSourceFactory.createTrafficCaptureSource(topContext, params, Duration.ofSeconds(params.lookaheadTimeSeconds)); var authTransformer = buildAuthTransformerFactory(params)) @@ -301,13 +302,14 @@ public static void main(String[] args) throws Exception { new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig), params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests, orderedRequestTracker); - var activeContextMonitor = new ActiveContextMonitor( + activeContextMonitor = new ActiveContextMonitor( globalContextTracker, perContextTracker, orderedRequestTracker, 64, cf->cf.formatAsString(TrafficReplayerTopLevel::formatWorkItem), activeContextLogger); - scheduledExecutorService.scheduleAtFixedRate(()->{ - activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log(); - activeContextMonitor.run(); - }, + ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor; + scheduledExecutorService.scheduleAtFixedRate(() -> { + activeContextLogger.atInfo().setMessage(() -> "Total requests outstanding: " + tr.requestWorkTracker.size()).log(); + finalActiveContextMonitor.run(); + }, ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS); setupShutdownHookForReplayer(tr); @@ -318,6 +320,13 @@ public static void main(String[] args) throws Exception { log.info("Done processing TrafficStreams"); } finally { scheduledExecutorService.shutdown(); + if (activeContextMonitor != null) { + var acmLevel = globalContextTracker.getActiveScopesByAge().findAny().isPresent() ? + Level.ERROR : Level.INFO; + activeContextLogger.atLevel(acmLevel).setMessage(()->"Outstanding work after shutdown...").log(); + activeContextMonitor.run(); + activeContextLogger.atLevel(acmLevel).setMessage(()->"[end of run]]").log(); + } } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java index 008eca459..e64ef99a3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java @@ -42,16 +42,29 @@ public boolean isStopped() { @SneakyThrows private void consumeFromQueue() { - while (!stopped.get()) { - var workItem = workQueue.take(); - log.atDebug().setMessage(()->"liveTrafficStreamCostGate.permits: {} acquiring: {}") - .addArgument(liveTrafficStreamCostGate.availablePermits()) - .addArgument(workItem.cost) - .log(); - liveTrafficStreamCostGate.acquire(workItem.cost); - log.atDebug().setMessage(()->"Acquired liveTrafficStreamCostGate (available=" + - liveTrafficStreamCostGate.availablePermits()+") to process " + workItem.context).log(); - workItem.task.accept(workItem); + WorkItem workItem = null; + try { + while (!stopped.get()) { + workItem = workQueue.take(); + log.atDebug().setMessage(() -> "liveTrafficStreamCostGate.permits: {} acquiring: {}") + .addArgument(liveTrafficStreamCostGate.availablePermits()) + .addArgument(workItem.cost) + .log(); + liveTrafficStreamCostGate.acquire(workItem.cost); + WorkItem finalWorkItem = workItem; + log.atDebug().setMessage(() -> "Acquired liveTrafficStreamCostGate (available=" + + liveTrafficStreamCostGate.availablePermits() + ") to process " + finalWorkItem.context).log(); + workItem.task.accept(workItem); + workItem = null; + } + } catch (InterruptedException e) { + if (!stopped.get()) { + WorkItem finalWorkItem = workItem; + log.atError().setMessage(()->"consumeFromQueue() was interrupted with " + + (finalWorkItem != null ? "an active task and " : "") + + workQueue.size() + " enqueued items").log(); + } + throw e; } } diff --git a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties index ffccfb189..d4e48d242 100644 --- a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties +++ b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties @@ -13,7 +13,7 @@ appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [% appender.ReplayerLogFile.type = RollingFile appender.ReplayerLogFile.name = ReplayerLogFile appender.ReplayerLogFile.fileName = logs/replayer.log -appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log +appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log.gz appender.ReplayerLogFile.layout.type = PatternLayout appender.ReplayerLogFile.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx={}}{}%n appender.ReplayerLogFile.policies.type = Policies