Skip to content

Commit

Permalink
Assorted bugfixes
Browse files Browse the repository at this point in the history
A memory leak for errantly formed http messages going through HttpByteBufFormatter; log4j2 config bug that stopped old replayer logs from compressing.
I've also further improved the logging on shutdown.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Apr 15, 2024
1 parent 19048a5 commit 2fa83c1
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stre
}
});

return channel.readInbound();
try {
return channel.readInbound();
} finally {
channel.finishAndReleaseAll();
}
}

public static FullHttpRequest parseHttpRequestFromBufs(Stream<ByteBuf> byteBufStream, boolean releaseByteBufs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public static void main(String[] args) throws Exception {
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"),
contextTrackers);

ActiveContextMonitor activeContextMonitor = null;
try (var blockingTrafficSource = TrafficCaptureSourceFactory.createTrafficCaptureSource(topContext, params,
Duration.ofSeconds(params.lookaheadTimeSeconds));
var authTransformer = buildAuthTransformerFactory(params))
Expand All @@ -301,13 +302,14 @@ public static void main(String[] args) throws Exception {
new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig),
params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests,
orderedRequestTracker);
var activeContextMonitor = new ActiveContextMonitor(
activeContextMonitor = new ActiveContextMonitor(
globalContextTracker, perContextTracker, orderedRequestTracker, 64,
cf->cf.formatAsString(TrafficReplayerTopLevel::formatWorkItem), activeContextLogger);
scheduledExecutorService.scheduleAtFixedRate(()->{
activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log();
activeContextMonitor.run();
},
ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor;
scheduledExecutorService.scheduleAtFixedRate(() -> {
activeContextLogger.atInfo().setMessage(() -> "Total requests outstanding: " + tr.requestWorkTracker.size()).log();
finalActiveContextMonitor.run();
},
ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS);

setupShutdownHookForReplayer(tr);
Expand All @@ -318,6 +320,13 @@ public static void main(String[] args) throws Exception {
log.info("Done processing TrafficStreams");
} finally {
scheduledExecutorService.shutdown();
if (activeContextMonitor != null) {
var acmLevel = globalContextTracker.getActiveScopesByAge().findAny().isPresent() ?
Level.ERROR : Level.INFO;
activeContextLogger.atLevel(acmLevel).setMessage(()->"Outstanding work after shutdown...").log();
activeContextMonitor.run();
activeContextLogger.atLevel(acmLevel).setMessage(()->"[end of run]]").log();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,29 @@ public boolean isStopped() {

@SneakyThrows
private void consumeFromQueue() {
while (!stopped.get()) {
var workItem = workQueue.take();
log.atDebug().setMessage(()->"liveTrafficStreamCostGate.permits: {} acquiring: {}")
.addArgument(liveTrafficStreamCostGate.availablePermits())
.addArgument(workItem.cost)
.log();
liveTrafficStreamCostGate.acquire(workItem.cost);
log.atDebug().setMessage(()->"Acquired liveTrafficStreamCostGate (available=" +
liveTrafficStreamCostGate.availablePermits()+") to process " + workItem.context).log();
workItem.task.accept(workItem);
WorkItem workItem = null;
try {
while (!stopped.get()) {
workItem = workQueue.take();
log.atDebug().setMessage(() -> "liveTrafficStreamCostGate.permits: {} acquiring: {}")
.addArgument(liveTrafficStreamCostGate.availablePermits())
.addArgument(workItem.cost)
.log();
liveTrafficStreamCostGate.acquire(workItem.cost);
WorkItem finalWorkItem = workItem;
log.atDebug().setMessage(() -> "Acquired liveTrafficStreamCostGate (available=" +
liveTrafficStreamCostGate.availablePermits() + ") to process " + finalWorkItem.context).log();
workItem.task.accept(workItem);
workItem = null;
}
} catch (InterruptedException e) {
if (!stopped.get()) {
WorkItem finalWorkItem = workItem;
log.atError().setMessage(()->"consumeFromQueue() was interrupted with " +
(finalWorkItem != null ? "an active task and " : "") +
workQueue.size() + " enqueued items").log();
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%
appender.ReplayerLogFile.type = RollingFile
appender.ReplayerLogFile.name = ReplayerLogFile
appender.ReplayerLogFile.fileName = logs/replayer.log
appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log
appender.ReplayerLogFile.filePattern = logs/%d{yyyy-MM}{UTC}/replayer-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log.gz
appender.ReplayerLogFile.layout.type = PatternLayout
appender.ReplayerLogFile.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx={}}{}%n
appender.ReplayerLogFile.policies.type = Policies
Expand Down

0 comments on commit 2fa83c1

Please sign in to comment.