Skip to content

Commit

Permalink
ActivityContextMonitor now dedups ancestor scopes and replaces them w…
Browse files Browse the repository at this point in the history
…ith a reference (System.hashId of the context) rather than reprinting them.

Those items NOT printed do NOT impact the counting, so more contexts will be eligible to log.
The old mode is still supported and can be run via logTopOpenActivities(false).

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Apr 11, 2024
1 parent 8134dbb commit 2605255
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Lombok;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.Utils;
import org.opensearch.migrations.tracing.ActiveContextTracker;
Expand All @@ -12,19 +13,22 @@
import org.slf4j.event.Level;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

@Slf4j
Expand Down Expand Up @@ -105,63 +109,93 @@ Duration getAge(long recordedNanoTime) {
* Try to print out the most valuable details at the end, assuming that a user is tailing a file that's
* constantly being appended, and therefore could be harder to home in on the start of a block.
*/
public void logTopOpenActivities() {
public void logTopOpenActivities(boolean dedupCommonTraces) {
logRequests().ifPresent(ll->logger.accept(ll, ()->"\n"));
logTopActiveScopes().ifPresent(ll->logger.accept(ll, ()->"\n"));
logTopActiveScopesByType().ifPresent(ll->logger.accept(ll, ()->"\n"));
logScopes(dedupCommonTraces);
}

public Optional<Level> logTopActiveScopesByType() {
return perActivityContextTracker.getActiveScopeTypes().map(c->
gatherActivities(perActivityContextTracker.getOldestActiveScopes(c),
perActivityContextTracker.numScopesFor(c),
this::getLogLevelForActiveContext))
.sorted(Comparator.comparingDouble(ActivitiesAndDepthsForLogging::getAverageContextDepth))
.map(cad-> {
public void logScopes(boolean dedupCommonTraces) {
var scopesSeen = dedupCommonTraces ? new HashSet<IScopedInstrumentationAttributes>() : null;
var activitiesDeferral = getTopActivities(scopesSeen);
logTopActiveScopes(scopesSeen).ifPresent(ll->logger.accept(ll, ()->"\n"));
logTopActiveScopesByType(activitiesDeferral).ifPresent(ll->logger.accept(ll, ()->"\n"));
}

private Optional<Level> logTopActiveScopesByType(Stream<ActivitiesAndDepthsForLogging> stream) {
return stream.map(cad-> {
if (cad.items.isEmpty()) { return Optional.<Level>empty(); }
final var sample = cad.items.get(0);
logger.accept(getHigherLevel(Optional.of(sample.getValue()), Optional.of(Level.INFO)).get(), () ->
OLDEST_ITEMS_FROM_GROUP_LABEL_PREAMBLE + cad.totalScopes + " scopes that are past " +
"thresholds for '" + sample.getKey().getActivityName() + "'");
cad.items.forEach(kvp->logger.accept(kvp.getValue(), ()->activityToString(kvp.getKey())));
return (Optional<Level>) Optional.of(cad.items.get(0).getValue());
logger.accept(getHigherLevel(Optional.of(sample.getLevel()), Optional.of(Level.INFO)).get(), () ->
OLDEST_ITEMS_FROM_GROUP_LABEL_PREAMBLE + cad.totalScopes +
" scopes for '" + sample.getScope().getActivityName() + "'" +
" that are past thresholds that are not otherwise reported below ");
final var numItems = cad.items.size();
IntStream.range(0, numItems).mapToObj(i->cad.items.get(numItems-i-1))
.forEach(kvp-> logger.accept(kvp.getLevel(),
()->activityToString(kvp.getScope(), kvp.ancestorDepthBeforeRedundancy)));
return (Optional<Level>) Optional.of(cad.items.get(0).getLevel());
})
.collect(Utils.foldLeft(Optional.<Level>empty(), ActiveContextMonitor::getHigherLevel));
}

private Stream<ActivitiesAndDepthsForLogging>
getTopActivities(Set<IScopedInstrumentationAttributes> scopesSeenSoFar) {
var reverseOrderedList = perActivityContextTracker.getActiveScopeTypes()
.map(c->Map.<Class<IScopedInstrumentationAttributes>,Supplier<Stream<IScopedInstrumentationAttributes>>>
entry(c,()->perActivityContextTracker.getOldestActiveScopes(c)))
.sorted(Comparator.comparingInt(kvp -> -1 * kvp.getValue().get().findAny()
.map(ActiveContextMonitor::contextDepth).orElse(0)))
.map(kvp->gatherActivities(scopesSeenSoFar, kvp.getValue().get(),
perActivityContextTracker.numScopesFor(kvp.getKey()),
this::getLogLevelForActiveContext))
.collect(Collectors.toCollection(ArrayList::new));
Collections.reverse(reverseOrderedList);
return reverseOrderedList.stream();
}

private static Optional<Level> getHigherLevel(Optional<Level> aOuter, Optional<Level> bOuter) {
return aOuter.map(a -> bOuter.filter(b -> a.toInt() <= b.toInt()).orElse(a))
.or(() -> bOuter);
}

public Optional<Level> logRequests() {
var orderedItems = orderedRequestTracker.orderedSet;
return logActiveItems(orderedItems.stream(), orderedItems.size(),
return logActiveItems(null, orderedItems.stream(), orderedItems.size(),
" outstanding requests that are past thresholds",
tkaf -> getLogLevelForActiveContext(tkaf.nanoTimeKey),
tkaf -> activityToString(tkaf, INDENT));
this::activityToString);
}

public Optional<Level> logTopActiveScopes() {
return logActiveItems(globalContextTracker.getActiveScopesByAge(), globalContextTracker.size(),
" GLOBAL scopes that are past thresholds",
private Optional<Level> logTopActiveScopes(Set<IScopedInstrumentationAttributes> scopesSeen) {
return logActiveItems(scopesSeen,
globalContextTracker.getActiveScopesByAge(), globalContextTracker.size(),
" GLOBAL scopes that are past thresholds that are not otherwise reported below",
this::getLogLevelForActiveContext,
this::activityToString);
ctx -> activityToString(ctx, scanUntilAncestorSeen(scopesSeen, ctx, 0)));
}

@AllArgsConstructor
@Getter
private static class ScopePath {
private final IScopedInstrumentationAttributes scope;
private final int ancestorDepthBeforeRedundancy;

Check warning on line 181 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java#L181

Added line #L181 was not covered by tests
private final Level level;
}

@AllArgsConstructor
@Getter
private static class ActivitiesAndDepthsForLogging {
ArrayList<Map.Entry<IScopedInstrumentationAttributes, Level>> items;
ArrayList<ScopePath> items;
double averageContextDepth;
long totalScopes;

Check warning on line 190 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java#L188-L190

Added lines #L188 - L190 were not covered by tests
}

private ActivitiesAndDepthsForLogging
gatherActivities(Stream<IScopedInstrumentationAttributes> oldestActiveScopes, long numScopes,
Function<IScopedInstrumentationAttributes, Optional<Level>> getLevel) {
gatherActivities(Set<IScopedInstrumentationAttributes> scopesSeenSoFar,
Stream<IScopedInstrumentationAttributes> oldestActiveScopes, long numScopes,
Function<IScopedInstrumentationAttributes, Optional<Level>> getLevel) {
int depthSum = 0;
var outList = new ArrayList<Map.Entry<IScopedInstrumentationAttributes, Level>>();
var outList = new ArrayList<ScopePath>();
try {
var activeScopeIterator = oldestActiveScopes.iterator();
while ((outList.size() < totalItemsToOutputLimit) && activeScopeIterator.hasNext()) {
Expand All @@ -170,8 +204,11 @@ private static class ActivitiesAndDepthsForLogging {
if (levelForElementOp.isEmpty()) {
break;
}
outList.add(new AbstractMap.SimpleImmutableEntry<>(activeScope, levelForElementOp.get()));
depthSum += contextDepth(activeScope);
var ancestorDepth = scanUntilAncestorSeen(scopesSeenSoFar, activeScope, 0);
if (ancestorDepth != 0) {
outList.add(new ScopePath(activeScope, ancestorDepth, levelForElementOp.get()));
depthSum += contextDepth(activeScope);
}
}
} catch (NoSuchElementException e) {

Check warning on line 213 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java#L213

Added line #L213 was not covered by tests
if (outList.isEmpty()) {
Expand All @@ -182,34 +219,52 @@ private static class ActivitiesAndDepthsForLogging {
return new ActivitiesAndDepthsForLogging(outList, depthSum/(double)outList.size(), numScopes);
}

private int contextDepth(IScopedInstrumentationAttributes activeScope) {
private static int scanUntilAncestorSeen(Set<IScopedInstrumentationAttributes> ctxSeenSoFar,
IScopedInstrumentationAttributes ctx, int depth) {
// if we added an item, then recurse if the parent was non-null; otherwise return depth
if (ctxSeenSoFar == null) {
return -1;
} else if (!ctxSeenSoFar.add(ctx)) {
return depth;
}
++depth;
var p = ctx.getEnclosingScope();
return p == null ? depth : scanUntilAncestorSeen(ctxSeenSoFar, p, depth);
}

private static int contextDepth(IScopedInstrumentationAttributes activeScope) {
return contextDepth(activeScope, 0);
}

private int contextDepth(IScopedInstrumentationAttributes activeScope, int count) {
private static int contextDepth(IScopedInstrumentationAttributes activeScope, int count) {
return activeScope == null ? count : contextDepth(activeScope.getEnclosingScope(), count+1);
}

private String activityToString(IScopedInstrumentationAttributes context) {
return activityToString(context, INDENT);
private String activityToString(OrderedWorkerTracker.TimeKeyAndFuture<Void> tkaf) {
var timeStr = "age=" + getAge(tkaf.nanoTimeKey);
return INDENT + timeStr + " " + formatWorkItem.apply(tkaf.future);
}

private String activityToString(OrderedWorkerTracker.TimeKeyAndFuture<Void> tkaf, String indent) {
var timeStr = "age=" + getAge(tkaf.nanoTimeKey);
return indent + timeStr + " " + formatWorkItem.apply(tkaf.future);
private String activityToString(IScopedInstrumentationAttributes context, int depthToInclude) {
return activityToString(context, depthToInclude, INDENT);
}

private String activityToString(IScopedInstrumentationAttributes context, String indent) {
if (context == null) {
return null;
private String activityToString(IScopedInstrumentationAttributes ctx, int depthToInclude, String indent) {
if (ctx == null) {
return "";
}
var idStr = depthToInclude < 0 ? null : "<<" + System.identityHashCode(ctx) + ">>";
if (depthToInclude == 0) {
return " parentRef=" + idStr + "...";
}
var timeStr = "age=" + getAge(context.getStartTimeNano()) + ", start=" + context.getStartTimeInstant();
var attributesStr = context.getPopulatedSpanAttributes().asMap().entrySet().stream()
var timeStr = "age=" + getAge(ctx.getStartTimeNano()) + ", start=" + ctx.getStartTimeInstant();
var attributesStr = ctx.getPopulatedSpanAttributes().asMap().entrySet().stream()
.map(kvp->kvp.getKey() + ": " + kvp.getValue())
.collect(Collectors.joining(", "));
return indent + timeStr + " " + context.getActivityName() + ": attribs={" + attributesStr + "}" +
Optional.ofNullable(activityToString(context.getEnclosingScope(), indent + INDENT))
.map(s->"\n"+s).orElse("");
var parentStr = activityToString(ctx.getEnclosingScope(), depthToInclude-1, indent + INDENT);
return indent + timeStr + Optional.ofNullable(idStr).map(s->" id="+s).orElse("") +
" " + ctx.getActivityName() + ": attribs={" + attributesStr + "}" +
(!parentStr.isEmpty() && depthToInclude != 1 ? "\n" : "") + parentStr;
}

private Optional<Level> getLogLevelForActiveContext(IScopedInstrumentationAttributes activeContext) {
Expand All @@ -224,9 +279,10 @@ private Optional<Level> getLogLevelForActiveContext(long nanoTime) {
}

private <T> Optional<Level>
logActiveItems(Stream<T> activeItemStream, long totalItems, String trailingGroupLabel,
Function<T, Optional<Level>> getLevel,
Function<T, String> getActiveLoggingMessage) {
logActiveItems(Set<T> itemsSeenSoFar,
Stream<T> activeItemStream, long totalItems, String trailingGroupLabel,
Function<T, Optional<Level>> getLevel,
Function<T, String> getActiveLoggingMessage) {
int numOutput = 0;
Optional<Level> firstLevel = Optional.empty();
try {
Expand All @@ -237,15 +293,17 @@ private Optional<Level> getLogLevelForActiveContext(long nanoTime) {
if (levelForElementOp.isEmpty()) {
break;
}
if (Optional.ofNullable(itemsSeenSoFar).map(s->s.contains(activeItem)).orElse(false)) {
continue;
}
if (firstLevel.isEmpty()) {
firstLevel = levelForElementOp;
}
if (numOutput++ == 0) {
logger.accept(getHigherLevel(levelForElementOp, Optional.of(Level.INFO)).get(), () ->
OLDEST_ITEMS_FROM_GROUP_LABEL_PREAMBLE + totalItems + trailingGroupLabel);

}
logger.accept(levelForElementOp.get(),()->getActiveLoggingMessage.apply(activeItem));
logger.accept(levelForElementOp.get(), () -> getActiveLoggingMessage.apply(activeItem));
}
} catch (NoSuchElementException e) {

Check warning on line 308 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java#L308

Added line #L308 was not covered by tests
if (numOutput == 0) {
Expand All @@ -258,6 +316,6 @@ private Optional<Level> getLogLevelForActiveContext(long nanoTime) {

@Override
public void run() {
logTopOpenActivities();
logTopOpenActivities(true);
}
}
Loading

0 comments on commit 2605255

Please sign in to comment.