From 2fa83c1a4618cf5b1504962bfd1ad74983f98a6e Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Thu, 11 Apr 2024 12:32:24 -0400 Subject: [PATCH] Assorted bugfixes A memory leak for errantly formed http messages going through HttpByteBufFormatter; log4j2 config bug that stopped old replayer logs from compressing. I've also further improved the logging on shutdown. Signed-off-by: Greg Schohn --- .../replay/HttpByteBufFormatter.java | 6 +++- .../migrations/replay/TrafficReplayer.java | 19 ++++++++--- .../traffic/source/TrafficStreamLimiter.java | 33 +++++++++++++------ .../src/main/resources/log4j2.properties | 2 +- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index a7d8065c2..a258d033b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -171,7 +171,11 @@ public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stre } }); - return channel.readInbound(); + try { + return channel.readInbound(); + } finally { + channel.finishAndReleaseAll(); + } } public static FullHttpRequest parseHttpRequestFromBufs(Stream byteBufStream, boolean releaseByteBufs) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 3b6b94031..7a628f2db 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -288,6 +288,7 @@ public static void main(String[] args) throws Exception { RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"), contextTrackers); + ActiveContextMonitor activeContextMonitor = null; try (var blockingTrafficSource = TrafficCaptureSourceFactory.createTrafficCaptureSource(topContext, params, Duration.ofSeconds(params.lookaheadTimeSeconds)); var authTransformer = buildAuthTransformerFactory(params)) @@ -301,13 +302,14 @@ public static void main(String[] args) throws Exception { new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig), params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests, orderedRequestTracker); - var activeContextMonitor = new ActiveContextMonitor( + activeContextMonitor = new ActiveContextMonitor( globalContextTracker, perContextTracker, orderedRequestTracker, 64, cf->cf.formatAsString(TrafficReplayerTopLevel::formatWorkItem), activeContextLogger); - scheduledExecutorService.scheduleAtFixedRate(()->{ - activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log(); - activeContextMonitor.run(); - }, + ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor; + scheduledExecutorService.scheduleAtFixedRate(() -> { + activeContextLogger.atInfo().setMessage(() -> "Total requests outstanding: " + tr.requestWorkTracker.size()).log(); + finalActiveContextMonitor.run(); + }, ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS); setupShutdownHookForReplayer(tr); @@ -318,6 +320,13 @@ public static void main(String[] args) throws Exception { log.info("Done processing TrafficStreams"); } finally { scheduledExecutorService.shutdown(); + if (activeContextMonitor != null) { + var acmLevel = globalContextTracker.getActiveScopesByAge().findAny().isPresent() ? + Level.ERROR : Level.INFO; + activeContextLogger.atLevel(acmLevel).setMessage(()->"Outstanding work after shutdown...").log(); + activeContextMonitor.run(); + activeContextLogger.atLevel(acmLevel).setMessage(()->"[end of run]]").log(); + } } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java index 008eca459..e64ef99a3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/TrafficStreamLimiter.java @@ -42,16 +42,29 @@ public boolean isStopped() { @SneakyThrows private void consumeFromQueue() { - while (!stopped.get()) { - var workItem = workQueue.take(); - log.atDebug().setMessage(()->"liveTrafficStreamCostGate.permits: {} acquiring: {}") - .addArgument(liveTrafficStreamCostGate.availablePermits()) - .addArgument(workItem.cost) - .log(); - liveTrafficStreamCostGate.acquire(workItem.cost); - log.atDebug().setMessage(()->"Acquired liveTrafficStreamCostGate (available=" + - liveTrafficStreamCostGate.availablePermits()+") to process " + workItem.context).log(); - workItem.task.accept(workItem); + WorkItem workItem = null; + try { + while (!stopped.get()) { + workItem = workQueue.take(); + log.atDebug().setMessage(() -> "liveTrafficStreamCostGate.permits: {} acquiring: {}") + .addArgument(liveTrafficStreamCostGate.availablePermits()) + .addArgument(workItem.cost) + .log(); + liveTrafficStreamCostGate.acquire(workItem.cost); + WorkItem finalWorkItem = workItem; + log.atDebug().setMessage(() -> "Acquired liveTrafficStreamCostGate (available=" + + liveTrafficStreamCostGate.availablePermits() + ") to process " + finalWorkItem.context).log(); + workItem.task.accept(workItem); + workItem = null; + } + } catch (InterruptedException e) { + if (!stopped.get()) { + WorkItem finalWorkItem = workItem; + log.atError().setMessage(()->"consumeFromQueue() was interrupted with " + + (finalWorkItem != null ? "an active task and " : "") + + workQueue.size() + " enqueued items").log(); + } + throw e; } } diff --git a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties index ffccfb189..d4e48d242 100644 --- a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties +++ b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties @@ -13,7 +13,7 @@ appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [% appender.ReplayerLogFile.type = RollingFile appender.ReplayerLogFile.name = ReplayerLogFile appender.ReplayerLogFile.fileName = logs/replayer.log -appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log +appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log.gz appender.ReplayerLogFile.layout.type = PatternLayout appender.ReplayerLogFile.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx={}}{}%n appender.ReplayerLogFile.policies.type = Policies