From 2605255b628c32a30f6aa86ec874e691284587e8 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 10 Apr 2024 23:54:11 -0400 Subject: [PATCH] ActivityContextMonitor now dedups ancestor scopes and replaces them with a reference (System.hashId of the context) rather than reprinting them. Those items NOT printed do NOT impact the counting, so more contexts will be eligible to log. The old mode is still supported and can be run via logTopOpenActivities(false). Signed-off-by: Greg Schohn --- .../replay/util/ActiveContextMonitor.java | 156 ++++++++++++------ .../replay/util/ActiveContextMonitorTest.java | 90 ++++++++-- 2 files changed, 182 insertions(+), 64 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java index b80655e30..86b353790 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java @@ -3,6 +3,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Lombok; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.Utils; import org.opensearch.migrations.tracing.ActiveContextTracker; @@ -12,12 +13,14 @@ import org.slf4j.event.Level; import java.time.Duration; -import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -25,6 +28,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; @Slf4j @@ -105,30 +109,50 @@ Duration getAge(long recordedNanoTime) { * Try to print out the most valuable details at the end, assuming that a user is tailing a file that's * constantly being appended, and therefore could be harder to home in on the start of a block. */ - public void logTopOpenActivities() { + public void logTopOpenActivities(boolean dedupCommonTraces) { logRequests().ifPresent(ll->logger.accept(ll, ()->"\n")); - logTopActiveScopes().ifPresent(ll->logger.accept(ll, ()->"\n")); - logTopActiveScopesByType().ifPresent(ll->logger.accept(ll, ()->"\n")); + logScopes(dedupCommonTraces); } - public Optional logTopActiveScopesByType() { - return perActivityContextTracker.getActiveScopeTypes().map(c-> - gatherActivities(perActivityContextTracker.getOldestActiveScopes(c), - perActivityContextTracker.numScopesFor(c), - this::getLogLevelForActiveContext)) - .sorted(Comparator.comparingDouble(ActivitiesAndDepthsForLogging::getAverageContextDepth)) - .map(cad-> { + public void logScopes(boolean dedupCommonTraces) { + var scopesSeen = dedupCommonTraces ? new HashSet() : null; + var activitiesDeferral = getTopActivities(scopesSeen); + logTopActiveScopes(scopesSeen).ifPresent(ll->logger.accept(ll, ()->"\n")); + logTopActiveScopesByType(activitiesDeferral).ifPresent(ll->logger.accept(ll, ()->"\n")); + } + + private Optional logTopActiveScopesByType(Stream stream) { + return stream.map(cad-> { if (cad.items.isEmpty()) { return Optional.empty(); } final var sample = cad.items.get(0); - logger.accept(getHigherLevel(Optional.of(sample.getValue()), Optional.of(Level.INFO)).get(), () -> - OLDEST_ITEMS_FROM_GROUP_LABEL_PREAMBLE + cad.totalScopes + " scopes that are past " + - "thresholds for '" + sample.getKey().getActivityName() + "'"); - cad.items.forEach(kvp->logger.accept(kvp.getValue(), ()->activityToString(kvp.getKey()))); - return (Optional) Optional.of(cad.items.get(0).getValue()); + logger.accept(getHigherLevel(Optional.of(sample.getLevel()), Optional.of(Level.INFO)).get(), () -> + OLDEST_ITEMS_FROM_GROUP_LABEL_PREAMBLE + cad.totalScopes + + " scopes for '" + sample.getScope().getActivityName() + "'" + + " that are past thresholds that are not otherwise reported below "); + final var numItems = cad.items.size(); + IntStream.range(0, numItems).mapToObj(i->cad.items.get(numItems-i-1)) + .forEach(kvp-> logger.accept(kvp.getLevel(), + ()->activityToString(kvp.getScope(), kvp.ancestorDepthBeforeRedundancy))); + return (Optional) Optional.of(cad.items.get(0).getLevel()); }) .collect(Utils.foldLeft(Optional.empty(), ActiveContextMonitor::getHigherLevel)); } + private Stream + getTopActivities(Set scopesSeenSoFar) { + var reverseOrderedList = perActivityContextTracker.getActiveScopeTypes() + .map(c->Map.,Supplier>> + entry(c,()->perActivityContextTracker.getOldestActiveScopes(c))) + .sorted(Comparator.comparingInt(kvp -> -1 * kvp.getValue().get().findAny() + .map(ActiveContextMonitor::contextDepth).orElse(0))) + .map(kvp->gatherActivities(scopesSeenSoFar, kvp.getValue().get(), + perActivityContextTracker.numScopesFor(kvp.getKey()), + this::getLogLevelForActiveContext)) + .collect(Collectors.toCollection(ArrayList::new)); + Collections.reverse(reverseOrderedList); + return reverseOrderedList.stream(); + } + private static Optional getHigherLevel(Optional aOuter, Optional bOuter) { return aOuter.map(a -> bOuter.filter(b -> a.toInt() <= b.toInt()).orElse(a)) .or(() -> bOuter); @@ -136,32 +160,42 @@ private static Optional getHigherLevel(Optional aOuter, Optional logRequests() { var orderedItems = orderedRequestTracker.orderedSet; - return logActiveItems(orderedItems.stream(), orderedItems.size(), + return logActiveItems(null, orderedItems.stream(), orderedItems.size(), " outstanding requests that are past thresholds", tkaf -> getLogLevelForActiveContext(tkaf.nanoTimeKey), - tkaf -> activityToString(tkaf, INDENT)); + this::activityToString); } - public Optional logTopActiveScopes() { - return logActiveItems(globalContextTracker.getActiveScopesByAge(), globalContextTracker.size(), - " GLOBAL scopes that are past thresholds", + private Optional logTopActiveScopes(Set scopesSeen) { + return logActiveItems(scopesSeen, + globalContextTracker.getActiveScopesByAge(), globalContextTracker.size(), + " GLOBAL scopes that are past thresholds that are not otherwise reported below", this::getLogLevelForActiveContext, - this::activityToString); + ctx -> activityToString(ctx, scanUntilAncestorSeen(scopesSeen, ctx, 0))); + } + + @AllArgsConstructor + @Getter + private static class ScopePath { + private final IScopedInstrumentationAttributes scope; + private final int ancestorDepthBeforeRedundancy; + private final Level level; } @AllArgsConstructor @Getter private static class ActivitiesAndDepthsForLogging { - ArrayList> items; + ArrayList items; double averageContextDepth; long totalScopes; } private ActivitiesAndDepthsForLogging - gatherActivities(Stream oldestActiveScopes, long numScopes, - Function> getLevel) { + gatherActivities(Set scopesSeenSoFar, + Stream oldestActiveScopes, long numScopes, + Function> getLevel) { int depthSum = 0; - var outList = new ArrayList>(); + var outList = new ArrayList(); try { var activeScopeIterator = oldestActiveScopes.iterator(); while ((outList.size() < totalItemsToOutputLimit) && activeScopeIterator.hasNext()) { @@ -170,8 +204,11 @@ private static class ActivitiesAndDepthsForLogging { if (levelForElementOp.isEmpty()) { break; } - outList.add(new AbstractMap.SimpleImmutableEntry<>(activeScope, levelForElementOp.get())); - depthSum += contextDepth(activeScope); + var ancestorDepth = scanUntilAncestorSeen(scopesSeenSoFar, activeScope, 0); + if (ancestorDepth != 0) { + outList.add(new ScopePath(activeScope, ancestorDepth, levelForElementOp.get())); + depthSum += contextDepth(activeScope); + } } } catch (NoSuchElementException e) { if (outList.isEmpty()) { @@ -182,34 +219,52 @@ private static class ActivitiesAndDepthsForLogging { return new ActivitiesAndDepthsForLogging(outList, depthSum/(double)outList.size(), numScopes); } - private int contextDepth(IScopedInstrumentationAttributes activeScope) { + private static int scanUntilAncestorSeen(Set ctxSeenSoFar, + IScopedInstrumentationAttributes ctx, int depth) { + // if we added an item, then recurse if the parent was non-null; otherwise return depth + if (ctxSeenSoFar == null) { + return -1; + } else if (!ctxSeenSoFar.add(ctx)) { + return depth; + } + ++depth; + var p = ctx.getEnclosingScope(); + return p == null ? depth : scanUntilAncestorSeen(ctxSeenSoFar, p, depth); + } + + private static int contextDepth(IScopedInstrumentationAttributes activeScope) { return contextDepth(activeScope, 0); } - private int contextDepth(IScopedInstrumentationAttributes activeScope, int count) { + private static int contextDepth(IScopedInstrumentationAttributes activeScope, int count) { return activeScope == null ? count : contextDepth(activeScope.getEnclosingScope(), count+1); } - private String activityToString(IScopedInstrumentationAttributes context) { - return activityToString(context, INDENT); + private String activityToString(OrderedWorkerTracker.TimeKeyAndFuture tkaf) { + var timeStr = "age=" + getAge(tkaf.nanoTimeKey); + return INDENT + timeStr + " " + formatWorkItem.apply(tkaf.future); } - private String activityToString(OrderedWorkerTracker.TimeKeyAndFuture tkaf, String indent) { - var timeStr = "age=" + getAge(tkaf.nanoTimeKey); - return indent + timeStr + " " + formatWorkItem.apply(tkaf.future); + private String activityToString(IScopedInstrumentationAttributes context, int depthToInclude) { + return activityToString(context, depthToInclude, INDENT); } - private String activityToString(IScopedInstrumentationAttributes context, String indent) { - if (context == null) { - return null; + private String activityToString(IScopedInstrumentationAttributes ctx, int depthToInclude, String indent) { + if (ctx == null) { + return ""; + } + var idStr = depthToInclude < 0 ? null : "<<" + System.identityHashCode(ctx) + ">>"; + if (depthToInclude == 0) { + return " parentRef=" + idStr + "..."; } - var timeStr = "age=" + getAge(context.getStartTimeNano()) + ", start=" + context.getStartTimeInstant(); - var attributesStr = context.getPopulatedSpanAttributes().asMap().entrySet().stream() + var timeStr = "age=" + getAge(ctx.getStartTimeNano()) + ", start=" + ctx.getStartTimeInstant(); + var attributesStr = ctx.getPopulatedSpanAttributes().asMap().entrySet().stream() .map(kvp->kvp.getKey() + ": " + kvp.getValue()) .collect(Collectors.joining(", ")); - return indent + timeStr + " " + context.getActivityName() + ": attribs={" + attributesStr + "}" + - Optional.ofNullable(activityToString(context.getEnclosingScope(), indent + INDENT)) - .map(s->"\n"+s).orElse(""); + var parentStr = activityToString(ctx.getEnclosingScope(), depthToInclude-1, indent + INDENT); + return indent + timeStr + Optional.ofNullable(idStr).map(s->" id="+s).orElse("") + + " " + ctx.getActivityName() + ": attribs={" + attributesStr + "}" + + (!parentStr.isEmpty() && depthToInclude != 1 ? "\n" : "") + parentStr; } private Optional getLogLevelForActiveContext(IScopedInstrumentationAttributes activeContext) { @@ -224,9 +279,10 @@ private Optional getLogLevelForActiveContext(long nanoTime) { } private Optional - logActiveItems(Stream activeItemStream, long totalItems, String trailingGroupLabel, - Function> getLevel, - Function getActiveLoggingMessage) { + logActiveItems(Set itemsSeenSoFar, + Stream activeItemStream, long totalItems, String trailingGroupLabel, + Function> getLevel, + Function getActiveLoggingMessage) { int numOutput = 0; Optional firstLevel = Optional.empty(); try { @@ -237,15 +293,17 @@ private Optional getLogLevelForActiveContext(long nanoTime) { if (levelForElementOp.isEmpty()) { break; } + if (Optional.ofNullable(itemsSeenSoFar).map(s->s.contains(activeItem)).orElse(false)) { + continue; + } if (firstLevel.isEmpty()) { firstLevel = levelForElementOp; } if (numOutput++ == 0) { logger.accept(getHigherLevel(levelForElementOp, Optional.of(Level.INFO)).get(), () -> OLDEST_ITEMS_FROM_GROUP_LABEL_PREAMBLE + totalItems + trailingGroupLabel); - } - logger.accept(levelForElementOp.get(),()->getActiveLoggingMessage.apply(activeItem)); + logger.accept(levelForElementOp.get(), () -> getActiveLoggingMessage.apply(activeItem)); } } catch (NoSuchElementException e) { if (numOutput == 0) { @@ -258,6 +316,6 @@ private Optional getLogLevelForActiveContext(long nanoTime) { @Override public void run() { - logTopOpenActivities(); + logTopOpenActivities(true); } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/ActiveContextMonitorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/ActiveContextMonitorTest.java index e65865c5d..4ea541f42 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/ActiveContextMonitorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/ActiveContextMonitorTest.java @@ -23,6 +23,66 @@ @Slf4j class ActiveContextMonitorTest { + private Pattern makeSuppressDedupedPattern(int visibleRequestCount, int requestScopeCount, + int visibleScopeCount, int totalScopeCount) { + var sb = new StringBuilder(); + sb.append("^"); + sb.append("Oldest of " + requestScopeCount + " outstanding requests that are past thresholds.*\\n"); + for (int i=0; i> httpTransaction: attribs=\\{.*\\}.*\\n"); + + sb.append("Oldest of " + requestScopeCount + " scopes.*'trafficStreamLifetime'.*\\n"); + sb.append(indent(1) + "age=P.*S, start=.*Z id=<<.*>> trafficStreamLifetime: attribs=\\{.*\\}.*\\n"); + + sb.append("Oldest of " + requestScopeCount + " scopes .* 'httpTransaction'.*\\n"); + for (int i=0; i> httpTransaction: attribs=\\{.*\\}.*\\n"); + sb.append(indent(2) + "age=P.*S, start=.*Z id=<<.*>> trafficStreamLifetime: attribs=\\{.*\\}.*\\n"); + } + sb.append(indent(3) + "age=P.*S, start=.*Z id=<<.*>> channel: attribs=\\{.*\\}.*\\n"); + + sb.append("$"); + + return Pattern.compile(sb.toString(), Pattern.MULTILINE); + } + + @Test + void testThatCommonAncestorsAreShownJustEnough() throws Exception { + var loggedEntries = new ArrayList>(); + var globalContextTracker = new ActiveContextTracker(); + var perActivityContextTracker = new ActiveContextTrackerByActivityType(); + var orderedWorkerTracker = new OrderedWorkerTracker(); + var compositeTracker = new CompositeContextTracker(globalContextTracker, perActivityContextTracker); + var durationLevelMap = new HashMap<>(Map.of( + Level.ERROR, Duration.ofMillis(5), + Level.WARN, Duration.ofMillis(4), + Level.INFO, Duration.ofMillis(3), + Level.DEBUG, Duration.ofMillis(2), + Level.TRACE, Duration.ofMillis(1))); + var acm = new ActiveContextMonitor( + globalContextTracker, perActivityContextTracker, orderedWorkerTracker, 2, + dtfc -> "", + (Level level, Supplier msgSupplier) -> loggedEntries.add(Map.entry(level, msgSupplier.get())), + level -> level == Level.ERROR, durationLevelMap); + try (var testContext = TestContext.noOtelTracking()) { + for (int i = 0; i < 3; ++i) { + var rc = testContext.getTestConnectionRequestContext("connection-0", i); + addContexts(compositeTracker, rc); + final var idx = i; + orderedWorkerTracker.put(rc.getReplayerRequestKey(), + new DiagnosticTrackableCompletableFuture<>(new CompletableFuture<>(), () -> "dummy #" + idx)); + } + Thread.sleep(10); + acm.run(); + checkAllEntriesAreErrorLevel(loggedEntries); + checkAndClearLines(loggedEntries, makeSuppressDedupedPattern(2, 3, 1, 7)); + } + } + @Test void testThatNewerItemsArentInspected() throws Exception { final var TRANCHE_SIZE = 10; @@ -51,14 +111,14 @@ void testThatNewerItemsArentInspected() throws Exception { new DiagnosticTrackableCompletableFuture<>(new CompletableFuture<>(), () -> "dummy #" + idx)); } var startTime = System.nanoTime(); - acm.run(); + acm.logTopOpenActivities(false); checkAllEntriesAreErrorLevel(loggedEntries); checkAndClearLines(loggedEntries, Pattern.compile("\\n")); Thread.sleep(10); durationLevelMap.put(Level.ERROR, Duration.ofNanos(System.nanoTime()-startTime)); acm.setAgeToLevelMap(durationLevelMap); - acm.run(); + acm.logTopOpenActivities(false); checkAllEntriesAreErrorLevel(loggedEntries); checkAndClearLines(loggedEntries, makePattern(2, 10, 2,21)); @@ -69,7 +129,7 @@ void testThatNewerItemsArentInspected() throws Exception { new DiagnosticTrackableCompletableFuture<>(new CompletableFuture<>(), () -> "dummy obj")); } - acm.run(); + acm.logTopOpenActivities(false); checkAllEntriesAreErrorLevel(loggedEntries); checkAndClearLines(loggedEntries, makePattern(2,20, 2,41)); } @@ -114,7 +174,7 @@ void test() throws Exception { addContexts(compositeTracker, requestContext1); Thread.sleep(20); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, patternWith1); var requestContext2 = testContext.getTestConnectionRequestContext(0); @@ -123,7 +183,7 @@ void test() throws Exception { addContexts(compositeTracker, requestContext2); Thread.sleep(20); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, patternWith2); var requestContext3 = testContext.getTestConnectionRequestContext(0); @@ -132,28 +192,28 @@ void test() throws Exception { addContexts(compositeTracker, requestContext3); Thread.sleep(20); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, patternWith3); Thread.sleep(50); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, patternWith3); compositeTracker.onContextClosed(requestContext1); compositeTracker.onContextClosed(requestContext1.getEnclosingScope()); orderedWorkerTracker.remove(orderedWorkerTracker.getRemainingItems().findFirst().get().getKey()); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, patternWith2); compositeTracker.onContextClosed(requestContext2); compositeTracker.onContextClosed(requestContext2.getEnclosingScope()); orderedWorkerTracker.remove(orderedWorkerTracker.getRemainingItems().findFirst().get().getKey()); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, patternWith1); removeContexts(compositeTracker, requestContext3); orderedWorkerTracker.remove(orderedWorkerTracker.getRemainingItems().findFirst().get().getKey()); - acm.run(); + acm.logTopOpenActivities(false); checkAndClearLines(loggedEntries, Pattern.compile("\\n", Pattern.MULTILINE)); } @@ -163,11 +223,11 @@ private static String indent(int i) { return IntStream.range(0, i).mapToObj(ignored->ActiveContextMonitor.INDENT).collect(Collectors.joining()); } - private Pattern makePattern(int visibleRequestCount, int requestScopeCount, + private Pattern makePattern(int visibleRequestCount, int totalRequestCount, int visibleScopeCount, int totalScopeCount) { var sb = new StringBuilder(); sb.append("^"); - sb.append("Oldest of " + requestScopeCount + " outstanding requests that are past thresholds.*\\n"); + sb.append("Oldest of " + totalRequestCount + " outstanding requests that are past thresholds.*\\n"); for (int i=0; i