Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print long running work for debugging #564

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@

@Override
public CodedOutputStreamWrapper createStream() {
telemetryContext.getCurrentSpan().addEvent("streamCreated");
telemetryContext.addEvent("streamCreated");

ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
Expand Down Expand Up @@ -123,7 +123,7 @@
return sendFullyAsync(producer, kafkaRecord)
.whenComplete(((recordMetadata, throwable) -> {
if (throwable != null) {
flushContext.addException(throwable, true);
flushContext.addTraceException(throwable, true);

Check warning on line 126 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java#L126

Added line #L126 was not covered by tests
log.error("Error sending producer record: {}", recordId, throwable);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public KafkaRecordContext(IRootKafkaOffloaderContext rootScope, IConnectionConte
this.topic = topic;
this.recordId = recordId;
initializeSpan();
getCurrentSpan().setAttribute(RECORD_SIZE_ATTR, recordSize);
this.setTraceAttribute(RECORD_SIZE_ATTR, recordSize);
}

public static class MetricInstruments extends CommonScopedMetricInstruments {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public TestRootKafkaOffloaderContext() {
}

public TestRootKafkaOffloaderContext(InMemoryInstrumentationBundle inMemoryInstrumentationBundle) {
super("tests", inMemoryInstrumentationBundle == null ? null :
super("tests", DO_NOTHING_TRACKER, inMemoryInstrumentationBundle == null ? null :
inMemoryInstrumentationBundle.openTelemetrySdk);
this.inMemoryInstrumentationBundle = inMemoryInstrumentationBundle;
final var meter = getMeterProvider().get("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ConnectionContext(IRootOffloaderContext rootInstrumentationScope, String
super(rootInstrumentationScope);
this.connectionId = connectionId;
this.nodeId = nodeId;
initializeSpan();
initializeSpan(rootInstrumentationScope);
meterDeltaEvent(getMetrics().activeConnectionsCounter, 1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.opensearch.migrations.tracing;

import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

public class ActiveContextTracker implements IContextTracker {
final ConcurrentSkipListSet<IScopedInstrumentationAttributes> orderedScopes;

public ActiveContextTracker() {
orderedScopes = makeScopeSkipList();
}

static ConcurrentSkipListSet<IScopedInstrumentationAttributes> makeScopeSkipList() {
return new ConcurrentSkipListSet<>(Comparator
.comparingLong(IWithStartTimeAndAttributes::getStartTimeNano)
.thenComparingInt(System::identityHashCode));
}

@Override
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
orderedScopes.add(scopedContext);
}

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
orderedScopes.remove(scopedContext);
}

public Stream<IScopedInstrumentationAttributes> getActiveScopesByAge() {
return orderedScopes.stream();
}

public long size() {
return orderedScopes.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.opensearch.migrations.tracing;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

public class ActiveContextTrackerByActivityType implements IContextTracker {
final ConcurrentHashMap<Class<IScopedInstrumentationAttributes>,
ConcurrentSkipListSet<IScopedInstrumentationAttributes>> orderedScopesByScopeType;

public ActiveContextTrackerByActivityType() {
orderedScopesByScopeType = new ConcurrentHashMap<>();
}

@Override
@SuppressWarnings("unchecked")
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
orderedScopesByScopeType
.computeIfAbsent((Class<IScopedInstrumentationAttributes>)scopedContext.getClass(),
c->ActiveContextTracker.makeScopeSkipList())
.add(scopedContext);
}

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
final var skipListByType = orderedScopesByScopeType.get(scopedContext.getClass());
assert skipListByType != null : "expected to have already added the scope to the collection, " +
"so the top-level class mapping should be present";
skipListByType.remove(scopedContext);
}

public Stream<IScopedInstrumentationAttributes>
getOldestActiveScopes(Class<IScopedInstrumentationAttributes> activityType) {
return Optional.ofNullable(orderedScopesByScopeType.getOrDefault(activityType, null))
.stream()
.flatMap(Collection::stream);
}

public Stream<Class<IScopedInstrumentationAttributes>> getActiveScopeTypes() {
return orderedScopesByScopeType.entrySet().stream()
.filter(kvp->!kvp.getValue().isEmpty())
.map(Map.Entry::getKey);
}

public long numScopesFor(Class<IScopedInstrumentationAttributes> c) {
return orderedScopesByScopeType.get(c).size();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.Getter;
import lombok.NonNull;

import java.util.Optional;
import java.util.stream.Stream;

public abstract class BaseNestedSpanContext
<S extends IInstrumentConstructor, T extends IScopedInstrumentationAttributes>
extends BaseSpanContext<S>
Expand All @@ -21,6 +11,10 @@ protected BaseNestedSpanContext(S rootScope, T enclosingScope) {
this.enclosingScope = enclosingScope;
}

protected void initializeSpan() {
initializeSpan(rootInstrumentationScope);
}

@Override
public IScopedInstrumentationAttributes getEnclosingScope() {
return enclosingScope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@
import lombok.Getter;
import lombok.NonNull;

import java.time.Instant;
import java.util.Optional;
import java.util.stream.Stream;

public abstract class BaseSpanContext<S extends IInstrumentConstructor>
implements IScopedInstrumentationAttributes, IWithStartTimeAndAttributes, IHasRootInstrumentationScope<S>, AutoCloseable {
implements IScopedInstrumentationAttributes, IHasRootInstrumentationScope<S>, AutoCloseable {
@Getter
protected final S rootInstrumentationScope;
@Getter
final long startNanoTime;
final long startTimeNano;
@Getter
final Instant startTimeInstant;
@Getter
Throwable observedExceptionToIncludeInMetrics;
@Getter
private Span currentSpan;

public BaseSpanContext(S rootScope) {
this.startNanoTime = System.nanoTime();
protected BaseSpanContext(S rootScope) {
this.startTimeNano = System.nanoTime();
this.startTimeInstant = Instant.now();
Comment on lines +27 to +28
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make sure I understand why we're using each of these:

  • startTimeNano is used to measure the duration of the span precisely
  • startTimeInstant is used to indicate when it started (but is a bit less precise?)

Is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much. startTime is for logs correlation. I've added these comments to the base interface to explain...

public interface IWithStartTimeAndAttributes extends IInstrumentationAttributes {
    /**
     * This is used to calculate the precise duration of the span.  System.nanoTime() is guaranteed to be monotonic
     * and not susceptible to clock fluctuations due to system time resets
     */
    long getStartTimeNano();
    /**
     * This is by some ContextTrackers to log the recorded wall-time so that it can be easier to find the event
     * within logs. Notice that if the system clock is reset (which should be rare), there could be duplicated
     * values at different points in time.
     */
    Instant getStartTimeInstant();

this.rootInstrumentationScope = rootScope;
rootScope.onContextCreated(this);
}

protected static <T> AttributesBuilder addAttributeIfPresent(AttributesBuilder attributesBuilder,
Expand All @@ -32,34 +35,31 @@ protected static <T> AttributesBuilder addAttributeIfPresent(AttributesBuilder a
}

@Override
public void endSpan() {
IScopedInstrumentationAttributes.super.endSpan();
rootInstrumentationScope.onContextClosed(this);
public @NonNull IContextTracker getContextTracker() {
return rootInstrumentationScope;
}

protected void initializeSpan() {
initializeSpanWithLinkedSpans(null);
protected void initializeSpan(@NonNull IInstrumentConstructor constructor) {
initializeSpanWithLinkedSpans(constructor, null);
}

protected void initializeSpanWithLinkedSpans(Stream<Span> linkedSpans) {
initializeSpan(rootInstrumentationScope.buildSpan(this, getActivityName(), linkedSpans));
protected void initializeSpanWithLinkedSpans(@NonNull IInstrumentConstructor constructor,
Stream<Span> linkedSpans) {
initializeSpan(constructor, rootInstrumentationScope.buildSpan(this, getActivityName(), linkedSpans));
}

public void initializeSpan(@NonNull Span s) {
public void initializeSpan(@NonNull IInstrumentConstructor constructor, @NonNull Span s) {
assert currentSpan == null : "only expect to set the current span once";
currentSpan = s;
constructor.onContextCreated(this);
}

@Override
public void addException(Throwable e, boolean isPropagating) {
IScopedInstrumentationAttributes.super.addException(e, isPropagating);
public void addTraceException(Throwable e, boolean isPropagating) {
IScopedInstrumentationAttributes.super.addTraceException(e, isPropagating);
observedExceptionToIncludeInMetrics = e;
}

public long getStartNanoTime() {
return this.startNanoTime;
}

public @NonNull Span getCurrentSpan() {
return this.currentSpan;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.migrations.tracing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CompositeContextTracker implements IContextTracker {
private final List<IContextTracker> trackers;

public CompositeContextTracker(IContextTracker...trackers) {
this.trackers = Arrays.stream(trackers).collect(Collectors.toUnmodifiableList());
}
public CompositeContextTracker(List<IContextTracker> trackers) {
this.trackers = new ArrayList<>(trackers);
}

Check warning on line 17 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java#L15-L17

Added lines #L15 - L17 were not covered by tests

@Override
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
trackers.forEach(ct->ct.onContextCreated(scopedContext));
}

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
trackers.forEach(ct->ct.onContextClosed(scopedContext));
}

public Stream<IContextTracker> getTrackers() {
return trackers.stream();

Check warning on line 30 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java#L30

Added line #L30 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.migrations.tracing;

/**
* For debugging or observability purposes, this interface allows for tracking the
* creation and termination of activities (such as those with spans).
*/
public interface IContextTracker {
default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) {}

/**
* This can be overridden to track creation and termination of spans
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

final static IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@

import java.util.stream.Stream;

public interface IInstrumentConstructor {
public interface IInstrumentConstructor extends IContextTracker {
@NonNull Span buildSpan(IScopedInstrumentationAttributes forScope, String spanName, Stream<Span> linkedSpans);

/**
* For debugging, this will be overridden to track creation and termination of spans
*/
default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) {}

/**
* For debugging, this will be overridden to track creation and termination of spans
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.trace.Span;
import lombok.NonNull;
import org.opensearch.migrations.Utils;

import java.time.Duration;
import java.util.ArrayDeque;

public interface IInstrumentationAttributes {
AttributeKey<Boolean> HAD_EXCEPTION_KEY = AttributeKey.booleanKey("hadException");
Expand All @@ -29,10 +26,10 @@ public interface IInstrumentationAttributes {
}

default void addCaughtException(Throwable e) {
addException(e, false);
addTraceException(e, false);
}

default void addException(Throwable e, boolean exceptionIsPropagating) {
default void addTraceException(Throwable e, boolean exceptionIsPropagating) {
meterIncrementEvent(getMetrics().exceptionCounter);
}

Expand Down
Loading