Skip to content

Commit

Permalink
Merge branch 'main' into LintingApril2024
Browse files Browse the repository at this point in the history
  • Loading branch information
gregschohn authored Apr 15, 2024
2 parents 04aa65e + 650093d commit bd1a454
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 20 deletions.
27 changes: 27 additions & 0 deletions TrafficCapture/dockerSolution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,33 @@ While in the TrafficCapture directory, run the following command:

`./gradlew :dockerSolution:composeUp`

### Interacting with the Docker Solution

A sample source cluster (running Elasticsearch 7.10) is set up with the Capture Proxy running in front of it. To route
calls through the Capture Proxy, send requests to `localhost:9200` with the default credentials (admin, admin), e.g.:
```sh
curl --insecure https://localhost:9200/_cat/indices -u 'admin:admin'
```

You can send the same calls to the source cluster while bypassing the Capture Proxy (calls will not be relayed to the
target cluster) via `localhost:19200`, and to the target cluster directly at `localhost:29200`.

For sample data that exercises various endpoints with a range of datatypes, you can execute a shell in the Migration
Console (`docker exec -i -t $(docker ps -aqf "ancestor=migrations/migration_console:latest") bash` or via the Docker
console) and run `./runTestBenchmarks.sh`. By default, this runs four short test workloads from
[Opensearch-Benchmark](https://github.com/opensearch-project/opensearch-benchmark) against the Capture Proxy endpoint
(9200). The Migration Console contains other utility functions (`./catIndices.sh`, `kafka-tools`, etc.) to interact
with the various containers of the solution.

With the default docker-compose configuration launched with :dockerSolution:composeUp, instrumentation containers
will be started (see below for other options). You can access the metrics generated by the solution in Grafana.
While the solution is running, go to [http://localhost:3000](http://localhost:3000/) and enter the default credentials
(admin, admin). Connections to Jaeger and Prometheus are automatically provisioned (see them under
`Connections->Data sources`), so you can go directly to `Explore` and define a query using the supplied data
from either data source. Traces for the capture proxy and replayer are available via Jaeger at
[http://localhost:16686](http://localhost:16686).


### Running with different telemetry flavors.

By default, composeUp will run an otel-collector that exports instrumentation to other local containers within the
Expand Down
4 changes: 3 additions & 1 deletion TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ dependencies {
def dockerFilesForExternalServices = [
"elasticsearchWithSearchGuard": "elasticsearch_searchguard",
"migrationConsole": "migration_console",
"otelCollector": "otel_collector"
"otelCollector": "otel_collector",
"grafana": "grafana"
]
// Create the static docker files that aren't hosting migrations java code from this repo
dockerFilesForExternalServices.each { projectName, dockerImageName ->
Expand Down Expand Up @@ -74,6 +75,7 @@ task buildDockerImages {
dependsOn buildDockerImage_elasticsearchWithSearchGuard
dependsOn buildDockerImage_migrationConsole
dependsOn buildDockerImage_otelCollector
dependsOn buildDockerImage_grafana

dependsOn buildDockerImage_trafficCaptureProxyServer
dependsOn buildDockerImage_trafficReplayer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:
- COLLECTOR_OTLP_ENABLED=true

grafana:
image: grafana/grafana:latest
image: 'migrations/grafana:latest'
networks:
- migrations
ports:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM grafana/grafana:latest

COPY datasources.yaml /usr/share/grafana/conf/provisioning/datasources/
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: 1
datasources:
- name: Jaeger
type: jaeger
url: http://jaeger:16686
basicAuth: false
isDefault: false
readOnly: false
jsonData: {}
- name: Prometheus
type: prometheus
url: http://prometheus:9090
access: proxy
basicAuth: false
isDefault: true
jsonData:
httpMethod: POST
prometheusType: Prometheus
exemplarTraceIdDestinations:
- name: trace_id
urlDisplayLabel: Jaeger
readOnly: false
2 changes: 1 addition & 1 deletion TrafficCapture/nettyWireLogging/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies {
implementation project(':captureOffloader')
implementation project(':coreUtilities')
api group: 'io.netty', name: 'netty-buffer', version: '4.1.100.Final'
api group: 'io.netty', name: 'netty-codec-http', version: '4.1.100.Final'
api group: 'io.netty', name: 'netty-codec-http', version: '4.1.108.Final'
api group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final'

implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stre
}
});

return channel.readInbound();
try {
return channel.readInbound();
} finally {
channel.finishAndReleaseAll();
}
}

public static FullHttpRequest parseHttpRequestFromBufs(Stream<ByteBuf> byteBufStream, boolean releaseByteBufs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public static void main(String[] args) throws Exception {
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"),
contextTrackers);

ActiveContextMonitor activeContextMonitor = null;
try (var blockingTrafficSource = TrafficCaptureSourceFactory.createTrafficCaptureSource(topContext, params,
Duration.ofSeconds(params.lookaheadTimeSeconds));
var authTransformer = buildAuthTransformerFactory(params))
Expand All @@ -301,13 +302,14 @@ public static void main(String[] args) throws Exception {
new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig),
params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests,
orderedRequestTracker);
var activeContextMonitor = new ActiveContextMonitor(
activeContextMonitor = new ActiveContextMonitor(
globalContextTracker, perContextTracker, orderedRequestTracker, 64,
cf->cf.formatAsString(TrafficReplayerTopLevel::formatWorkItem), activeContextLogger);
scheduledExecutorService.scheduleAtFixedRate(()->{
activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log();
activeContextMonitor.run();
},
ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor;
scheduledExecutorService.scheduleAtFixedRate(() -> {
activeContextLogger.atInfo().setMessage(() -> "Total requests outstanding: " + tr.requestWorkTracker.size()).log();
finalActiveContextMonitor.run();
},
ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS);

setupShutdownHookForReplayer(tr);
Expand All @@ -318,6 +320,13 @@ public static void main(String[] args) throws Exception {
log.info("Done processing TrafficStreams");
} finally {
scheduledExecutorService.shutdown();
if (activeContextMonitor != null) {
var acmLevel = globalContextTracker.getActiveScopesByAge().findAny().isPresent() ?
Level.ERROR : Level.INFO;
activeContextLogger.atLevel(acmLevel).setMessage(()->"Outstanding work after shutdown...").log();
activeContextMonitor.run();
activeContextLogger.atLevel(acmLevel).setMessage(()->"[end of run]]").log();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,29 @@ public boolean isStopped() {

@SneakyThrows
private void consumeFromQueue() {
while (!stopped.get()) {
var workItem = workQueue.take();
log.atDebug().setMessage(()->"liveTrafficStreamCostGate.permits: {} acquiring: {}")
.addArgument(liveTrafficStreamCostGate.availablePermits())
.addArgument(workItem.cost)
.log();
liveTrafficStreamCostGate.acquire(workItem.cost);
log.atDebug().setMessage(()->"Acquired liveTrafficStreamCostGate (available=" +
liveTrafficStreamCostGate.availablePermits()+") to process " + workItem.context).log();
workItem.task.accept(workItem);
WorkItem workItem = null;
try {
while (!stopped.get()) {
workItem = workQueue.take();
log.atDebug().setMessage(() -> "liveTrafficStreamCostGate.permits: {} acquiring: {}")
.addArgument(liveTrafficStreamCostGate.availablePermits())
.addArgument(workItem.cost)
.log();
liveTrafficStreamCostGate.acquire(workItem.cost);
WorkItem finalWorkItem = workItem;
log.atDebug().setMessage(() -> "Acquired liveTrafficStreamCostGate (available=" +
liveTrafficStreamCostGate.availablePermits() + ") to process " + finalWorkItem.context).log();
workItem.task.accept(workItem);
workItem = null;
}
} catch (InterruptedException e) {
if (!stopped.get()) {
WorkItem finalWorkItem = workItem;
log.atError().setMessage(()->"consumeFromQueue() was interrupted with " +
(finalWorkItem != null ? "an active task and " : "") +
workQueue.size() + " enqueued items").log();
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%
appender.ReplayerLogFile.type = RollingFile
appender.ReplayerLogFile.name = ReplayerLogFile
appender.ReplayerLogFile.fileName = logs/replayer.log
appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log
appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log.gz
appender.ReplayerLogFile.layout.type = PatternLayout
appender.ReplayerLogFile.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx={}}{}%n
appender.ReplayerLogFile.policies.type = Policies
Expand Down

0 comments on commit bd1a454

Please sign in to comment.