Skip to content

Commit

Permalink
Merge pull request #564 from gregschohn/PrintLongRunningWorkForDebugg…
Browse files Browse the repository at this point in the history
…ingPart2

Print long running work for debugging
  • Loading branch information
gregschohn authored Apr 11, 2024
2 parents 27e25e1 + 0e2ce10 commit 4fe2e9e
Show file tree
Hide file tree
Showing 43 changed files with 1,885 additions and 838 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public StreamManager(IRootKafkaOffloaderContext rootScope, IConnectionContext ct

@Override
public CodedOutputStreamWrapper createStream() {
telemetryContext.getCurrentSpan().addEvent("streamCreated");
telemetryContext.addEvent("streamCreated");

ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
Expand Down Expand Up @@ -123,7 +123,7 @@ public CodedOutputStreamWrapper createStream() {
return sendFullyAsync(producer, kafkaRecord)
.whenComplete(((recordMetadata, throwable) -> {
if (throwable != null) {
flushContext.addException(throwable, true);
flushContext.addTraceException(throwable, true);
log.error("Error sending producer record: {}", recordId, throwable);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public KafkaRecordContext(IRootKafkaOffloaderContext rootScope, IConnectionConte
this.topic = topic;
this.recordId = recordId;
initializeSpan();
getCurrentSpan().setAttribute(RECORD_SIZE_ATTR, recordSize);
this.setTraceAttribute(RECORD_SIZE_ATTR, recordSize);
}

public static class MetricInstruments extends CommonScopedMetricInstruments {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public TestRootKafkaOffloaderContext() {
}

public TestRootKafkaOffloaderContext(InMemoryInstrumentationBundle inMemoryInstrumentationBundle) {
super("tests", inMemoryInstrumentationBundle == null ? null :
super("tests", DO_NOTHING_TRACKER, inMemoryInstrumentationBundle == null ? null :
inMemoryInstrumentationBundle.openTelemetrySdk);
this.inMemoryInstrumentationBundle = inMemoryInstrumentationBundle;
final var meter = getMeterProvider().get("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ConnectionContext(IRootOffloaderContext rootInstrumentationScope, String
super(rootInstrumentationScope);
this.connectionId = connectionId;
this.nodeId = nodeId;
initializeSpan();
initializeSpan(rootInstrumentationScope);
meterDeltaEvent(getMetrics().activeConnectionsCounter, 1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.opensearch.migrations.tracing;

import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

public class ActiveContextTracker implements IContextTracker {
final ConcurrentSkipListSet<IScopedInstrumentationAttributes> orderedScopes;

public ActiveContextTracker() {
orderedScopes = makeScopeSkipList();
}

static ConcurrentSkipListSet<IScopedInstrumentationAttributes> makeScopeSkipList() {
return new ConcurrentSkipListSet<>(Comparator
.comparingLong(IWithStartTimeAndAttributes::getStartTimeNano)
.thenComparingInt(System::identityHashCode));
}

@Override
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
orderedScopes.add(scopedContext);
}

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
orderedScopes.remove(scopedContext);
}

public Stream<IScopedInstrumentationAttributes> getActiveScopesByAge() {
return orderedScopes.stream();
}

public long size() {
return orderedScopes.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.opensearch.migrations.tracing;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

public class ActiveContextTrackerByActivityType implements IContextTracker {
final ConcurrentHashMap<Class<IScopedInstrumentationAttributes>,
ConcurrentSkipListSet<IScopedInstrumentationAttributes>> orderedScopesByScopeType;

public ActiveContextTrackerByActivityType() {
orderedScopesByScopeType = new ConcurrentHashMap<>();
}

@Override
@SuppressWarnings("unchecked")
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
orderedScopesByScopeType
.computeIfAbsent((Class<IScopedInstrumentationAttributes>)scopedContext.getClass(),
c->ActiveContextTracker.makeScopeSkipList())
.add(scopedContext);
}

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
final var skipListByType = orderedScopesByScopeType.get(scopedContext.getClass());
assert skipListByType != null : "expected to have already added the scope to the collection, " +
"so the top-level class mapping should be present";
skipListByType.remove(scopedContext);
}

public Stream<IScopedInstrumentationAttributes>
getOldestActiveScopes(Class<IScopedInstrumentationAttributes> activityType) {
return Optional.ofNullable(orderedScopesByScopeType.getOrDefault(activityType, null))
.stream()
.flatMap(Collection::stream);
}

public Stream<Class<IScopedInstrumentationAttributes>> getActiveScopeTypes() {
return orderedScopesByScopeType.entrySet().stream()
.filter(kvp->!kvp.getValue().isEmpty())
.map(Map.Entry::getKey);
}

public long numScopesFor(Class<IScopedInstrumentationAttributes> c) {
return orderedScopesByScopeType.get(c).size();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.Getter;
import lombok.NonNull;

import java.util.Optional;
import java.util.stream.Stream;

public abstract class BaseNestedSpanContext
<S extends IInstrumentConstructor, T extends IScopedInstrumentationAttributes>
extends BaseSpanContext<S>
Expand All @@ -21,6 +11,10 @@ protected BaseNestedSpanContext(S rootScope, T enclosingScope) {
this.enclosingScope = enclosingScope;
}

protected void initializeSpan() {
initializeSpan(rootInstrumentationScope);
}

@Override
public IScopedInstrumentationAttributes getEnclosingScope() {
return enclosingScope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@
import lombok.Getter;
import lombok.NonNull;

import java.time.Instant;
import java.util.Optional;
import java.util.stream.Stream;

public abstract class BaseSpanContext<S extends IInstrumentConstructor>
implements IScopedInstrumentationAttributes, IWithStartTimeAndAttributes, IHasRootInstrumentationScope<S>, AutoCloseable {
implements IScopedInstrumentationAttributes, IHasRootInstrumentationScope<S>, AutoCloseable {
@Getter
protected final S rootInstrumentationScope;
@Getter
final long startNanoTime;
final long startTimeNano;
@Getter
final Instant startTimeInstant;
@Getter
Throwable observedExceptionToIncludeInMetrics;
@Getter
private Span currentSpan;

public BaseSpanContext(S rootScope) {
this.startNanoTime = System.nanoTime();
protected BaseSpanContext(S rootScope) {
this.startTimeNano = System.nanoTime();
this.startTimeInstant = Instant.now();
this.rootInstrumentationScope = rootScope;
rootScope.onContextCreated(this);
}

protected static <T> AttributesBuilder addAttributeIfPresent(AttributesBuilder attributesBuilder,
Expand All @@ -32,34 +35,31 @@ protected static <T> AttributesBuilder addAttributeIfPresent(AttributesBuilder a
}

@Override
public void endSpan() {
IScopedInstrumentationAttributes.super.endSpan();
rootInstrumentationScope.onContextClosed(this);
public @NonNull IContextTracker getContextTracker() {
return rootInstrumentationScope;
}

protected void initializeSpan() {
initializeSpanWithLinkedSpans(null);
protected void initializeSpan(@NonNull IInstrumentConstructor constructor) {
initializeSpanWithLinkedSpans(constructor, null);
}

protected void initializeSpanWithLinkedSpans(Stream<Span> linkedSpans) {
initializeSpan(rootInstrumentationScope.buildSpan(this, getActivityName(), linkedSpans));
protected void initializeSpanWithLinkedSpans(@NonNull IInstrumentConstructor constructor,
Stream<Span> linkedSpans) {
initializeSpan(constructor, rootInstrumentationScope.buildSpan(this, getActivityName(), linkedSpans));
}

public void initializeSpan(@NonNull Span s) {
public void initializeSpan(@NonNull IInstrumentConstructor constructor, @NonNull Span s) {
assert currentSpan == null : "only expect to set the current span once";
currentSpan = s;
constructor.onContextCreated(this);
}

@Override
public void addException(Throwable e, boolean isPropagating) {
IScopedInstrumentationAttributes.super.addException(e, isPropagating);
public void addTraceException(Throwable e, boolean isPropagating) {
IScopedInstrumentationAttributes.super.addTraceException(e, isPropagating);
observedExceptionToIncludeInMetrics = e;
}

public long getStartNanoTime() {
return this.startNanoTime;
}

public @NonNull Span getCurrentSpan() {
return this.currentSpan;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.migrations.tracing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CompositeContextTracker implements IContextTracker {
private final List<IContextTracker> trackers;

public CompositeContextTracker(IContextTracker...trackers) {
this.trackers = Arrays.stream(trackers).collect(Collectors.toUnmodifiableList());
}
public CompositeContextTracker(List<IContextTracker> trackers) {
this.trackers = new ArrayList<>(trackers);
}

@Override
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
trackers.forEach(ct->ct.onContextCreated(scopedContext));
}

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
trackers.forEach(ct->ct.onContextClosed(scopedContext));
}

public Stream<IContextTracker> getTrackers() {
return trackers.stream();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.migrations.tracing;

/**
* For debugging or observability purposes, this interface allows for tracking the
* creation and termination of activities (such as those with spans).
*/
public interface IContextTracker {
default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) {}

/**
* This can be overridden to track creation and termination of spans
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

final static IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@

import java.util.stream.Stream;

public interface IInstrumentConstructor {
public interface IInstrumentConstructor extends IContextTracker {
@NonNull Span buildSpan(IScopedInstrumentationAttributes forScope, String spanName, Stream<Span> linkedSpans);

/**
* For debugging, this will be overridden to track creation and termination of spans
*/
default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) {}

/**
* For debugging, this will be overridden to track creation and termination of spans
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.trace.Span;
import lombok.NonNull;
import org.opensearch.migrations.Utils;

import java.time.Duration;
import java.util.ArrayDeque;

public interface IInstrumentationAttributes {
AttributeKey<Boolean> HAD_EXCEPTION_KEY = AttributeKey.booleanKey("hadException");
Expand All @@ -29,10 +26,10 @@ public interface IInstrumentationAttributes {
}

default void addCaughtException(Throwable e) {
addException(e, false);
addTraceException(e, false);
}

default void addException(Throwable e, boolean exceptionIsPropagating) {
default void addTraceException(Throwable e, boolean exceptionIsPropagating) {
meterIncrementEvent(getMetrics().exceptionCounter);
}

Expand Down
Loading

0 comments on commit 4fe2e9e

Please sign in to comment.