Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor traffic replayer into three files #558

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@

@Override
public CodedOutputStreamWrapper createStream() {
telemetryContext.getCurrentSpan().addEvent("streamCreated");
telemetryContext.addEvent("streamCreated");

ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
Expand Down Expand Up @@ -123,7 +123,7 @@
return sendFullyAsync(producer, kafkaRecord)
.whenComplete(((recordMetadata, throwable) -> {
if (throwable != null) {
flushContext.addException(throwable, true);
flushContext.addTraceException(throwable, true);

Check warning on line 126 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java#L126

Added line #L126 was not covered by tests
log.error("Error sending producer record: {}", recordId, throwable);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public KafkaRecordContext(IRootKafkaOffloaderContext rootScope, IConnectionConte
this.topic = topic;
this.recordId = recordId;
initializeSpan();
getCurrentSpan().setAttribute(RECORD_SIZE_ATTR, recordSize);
this.setTraceAttribute(RECORD_SIZE_ATTR, recordSize);
}

public static class MetricInstruments extends CommonScopedMetricInstruments {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public TestRootKafkaOffloaderContext() {
}

public TestRootKafkaOffloaderContext(InMemoryInstrumentationBundle inMemoryInstrumentationBundle) {
super("tests", inMemoryInstrumentationBundle == null ? null :
super("tests", DO_NOTHING_TRACKER, inMemoryInstrumentationBundle == null ? null :
inMemoryInstrumentationBundle.openTelemetrySdk);
this.inMemoryInstrumentationBundle = inMemoryInstrumentationBundle;
final var meter = getMeterProvider().get("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ConnectionContext(IRootOffloaderContext rootInstrumentationScope, String
super(rootInstrumentationScope);
this.connectionId = connectionId;
this.nodeId = nodeId;
initializeSpan();
initializeSpan(rootInstrumentationScope);
meterDeltaEvent(getMetrics().activeConnectionsCounter, 1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.opensearch.migrations.tracing;

import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

public class ActiveContextTracker implements IContextTracker {
final ConcurrentSkipListSet<IScopedInstrumentationAttributes> orderedScopes;

public ActiveContextTracker() {
orderedScopes = makeScopeSkipList();
}

static ConcurrentSkipListSet<IScopedInstrumentationAttributes> makeScopeSkipList() {
return new ConcurrentSkipListSet<>(Comparator
.comparingLong(IWithStartTimeAndAttributes::getStartNanoTime)
.thenComparingInt(System::identityHashCode));
}

@Override
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
orderedScopes.add(scopedContext);
}

Check warning on line 24 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTracker.java#L23-L24

Added lines #L23 - L24 were not covered by tests

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
orderedScopes.remove(scopedContext);
}

Check warning on line 29 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTracker.java#L28-L29

Added lines #L28 - L29 were not covered by tests

public Stream<IScopedInstrumentationAttributes> getOldestActiveScopes() {
return orderedScopes.stream();

Check warning on line 32 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTracker.java#L32

Added line #L32 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.migrations.tracing;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

public class ActiveContextTrackerByActivityType implements IContextTracker {
final ConcurrentHashMap<Class<IScopedInstrumentationAttributes>,
ConcurrentSkipListSet<IScopedInstrumentationAttributes>> orderedScopesByScopeType;

public ActiveContextTrackerByActivityType() {
orderedScopesByScopeType = new ConcurrentHashMap<>();
}

@Override
@SuppressWarnings("unchecked")
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
orderedScopesByScopeType
.computeIfAbsent((Class<IScopedInstrumentationAttributes>)scopedContext.getClass(),
c->ActiveContextTracker.makeScopeSkipList())
.add(scopedContext);
}

Check warning on line 26 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java#L22-L26

Added lines #L22 - L26 were not covered by tests

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
final var skipListByType = orderedScopesByScopeType.get(scopedContext.getClass());

Check warning on line 30 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java#L30

Added line #L30 was not covered by tests
assert skipListByType != null : "expected to have already added the scope to the collection, " +
"so the top-level class mapping should be present";
skipListByType.remove(scopedContext);
}

Check warning on line 34 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java#L33-L34

Added lines #L33 - L34 were not covered by tests

public Stream<IScopedInstrumentationAttributes>
getOldestActiveScopes(Class<IScopedInstrumentationAttributes> activityType) {
return Optional.ofNullable(orderedScopesByScopeType.getOrDefault(activityType, null))
.stream()
.flatMap(Collection::stream);

Check warning on line 40 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java#L38-L40

Added lines #L38 - L40 were not covered by tests
}

public Stream<Class<IScopedInstrumentationAttributes>> getActiveScopeTypes() {
return orderedScopesByScopeType.entrySet().stream()

Check warning on line 44 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java#L44

Added line #L44 was not covered by tests
.filter(kvp->!kvp.getValue().isEmpty())
.map(Map.Entry::getKey);

Check warning on line 46 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/ActiveContextTrackerByActivityType.java#L46

Added line #L46 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.Getter;
import lombok.NonNull;

import java.util.Optional;
import java.util.stream.Stream;

public abstract class BaseNestedSpanContext
<S extends IInstrumentConstructor, T extends IScopedInstrumentationAttributes>
extends BaseSpanContext<S>
Expand All @@ -21,6 +11,10 @@ protected BaseNestedSpanContext(S rootScope, T enclosingScope) {
this.enclosingScope = enclosingScope;
}

protected void initializeSpan() {
initializeSpan(rootInstrumentationScope);
}

@Override
public IScopedInstrumentationAttributes getEnclosingScope() {
return enclosingScope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.stream.Stream;

public abstract class BaseSpanContext<S extends IInstrumentConstructor>
implements IScopedInstrumentationAttributes, IWithStartTimeAndAttributes, IHasRootInstrumentationScope<S>, AutoCloseable {
implements IScopedInstrumentationAttributes, IHasRootInstrumentationScope<S>, AutoCloseable {
@Getter
protected final S rootInstrumentationScope;
@Getter
Expand All @@ -20,10 +20,9 @@ public abstract class BaseSpanContext<S extends IInstrumentConstructor>
@Getter
private Span currentSpan;

public BaseSpanContext(S rootScope) {
protected BaseSpanContext(S rootScope) {
this.startNanoTime = System.nanoTime();
this.rootInstrumentationScope = rootScope;
rootScope.onContextCreated(this);
}

protected static <T> AttributesBuilder addAttributeIfPresent(AttributesBuilder attributesBuilder,
Expand All @@ -32,27 +31,28 @@ protected static <T> AttributesBuilder addAttributeIfPresent(AttributesBuilder a
}

@Override
public void endSpan() {
IScopedInstrumentationAttributes.super.endSpan();
rootInstrumentationScope.onContextClosed(this);
public @NonNull IContextTracker getContextTracker() {
return rootInstrumentationScope;
}

protected void initializeSpan() {
initializeSpanWithLinkedSpans(null);
protected void initializeSpan(@NonNull IInstrumentConstructor constructor) {
initializeSpanWithLinkedSpans(constructor, null);
}

protected void initializeSpanWithLinkedSpans(Stream<Span> linkedSpans) {
initializeSpan(rootInstrumentationScope.buildSpan(this, getActivityName(), linkedSpans));
protected void initializeSpanWithLinkedSpans(@NonNull IInstrumentConstructor constructor,
Stream<Span> linkedSpans) {
initializeSpan(constructor, rootInstrumentationScope.buildSpan(this, getActivityName(), linkedSpans));
}

public void initializeSpan(@NonNull Span s) {
public void initializeSpan(@NonNull IInstrumentConstructor constructor, @NonNull Span s) {
assert currentSpan == null : "only expect to set the current span once";
currentSpan = s;
constructor.onContextCreated(this);
}

@Override
public void addException(Throwable e, boolean isPropagating) {
IScopedInstrumentationAttributes.super.addException(e, isPropagating);
public void addTraceException(Throwable e, boolean isPropagating) {
IScopedInstrumentationAttributes.super.addTraceException(e, isPropagating);
observedExceptionToIncludeInMetrics = e;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.migrations.tracing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CompositeContextTracker implements IContextTracker {
private final List<IContextTracker> trackers;

public CompositeContextTracker(IContextTracker...trackers) {
this.trackers = Arrays.stream(trackers).collect(Collectors.toUnmodifiableList());
}
public CompositeContextTracker(List<IContextTracker> trackers) {
this.trackers = new ArrayList<>(trackers);
}

Check warning on line 17 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java#L15-L17

Added lines #L15 - L17 were not covered by tests

@Override
public void onContextCreated(IScopedInstrumentationAttributes scopedContext) {
trackers.forEach(ct->ct.onContextCreated(scopedContext));
}

Check warning on line 22 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java#L21-L22

Added lines #L21 - L22 were not covered by tests

@Override
public void onContextClosed(IScopedInstrumentationAttributes scopedContext) {
trackers.forEach(ct->ct.onContextClosed(scopedContext));
}

Check warning on line 27 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java#L26-L27

Added lines #L26 - L27 were not covered by tests

public Stream<IContextTracker> getTrackers() {
return trackers.stream();

Check warning on line 30 in TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/tracing/CompositeContextTracker.java#L30

Added line #L30 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.migrations.tracing;

/**
* For debugging or observability purposes, this interface allows for tracking the
* creation and termination of activities (such as those with spans).
*/
public interface IContextTracker {
default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) {}

/**
* This can be overridden to track creation and termination of spans
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

final static IContextTracker DO_NOTHING_TRACKER = new IContextTracker() {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@

import java.util.stream.Stream;

public interface IInstrumentConstructor {
public interface IInstrumentConstructor extends IContextTracker {
@NonNull Span buildSpan(IScopedInstrumentationAttributes forScope, String spanName, Stream<Span> linkedSpans);

/**
* For debugging, this will be overridden to track creation and termination of spans
*/
default void onContextCreated(IScopedInstrumentationAttributes newScopedContext) {}

/**
* For debugging, this will be overridden to track creation and termination of spans
*/
default void onContextClosed(IScopedInstrumentationAttributes newScopedContext) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.trace.Span;
import lombok.NonNull;
import org.opensearch.migrations.Utils;

import java.time.Duration;
import java.util.ArrayDeque;

public interface IInstrumentationAttributes {
AttributeKey<Boolean> HAD_EXCEPTION_KEY = AttributeKey.booleanKey("hadException");
Expand All @@ -29,10 +26,10 @@ public interface IInstrumentationAttributes {
}

default void addCaughtException(Throwable e) {
addException(e, false);
addTraceException(e, false);
}

default void addException(Throwable e, boolean exceptionIsPropagating) {
default void addTraceException(Throwable e, boolean exceptionIsPropagating) {
meterIncrementEvent(getMetrics().exceptionCounter);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
Expand All @@ -13,8 +14,7 @@

import java.util.ArrayDeque;

public interface IScopedInstrumentationAttributes
extends IWithStartTimeAndAttributes, AutoCloseable {
public interface IScopedInstrumentationAttributes extends IWithStartTimeAndAttributes, AutoCloseable {

String getActivityName();

Expand All @@ -26,6 +26,8 @@ public interface IScopedInstrumentationAttributes

@NonNull Span getCurrentSpan();

@NonNull IContextTracker getContextTracker();

default Attributes getPopulatedSpanAttributes() {
return getPopulatedSpanAttributesBuilder().build();
}
Expand All @@ -35,9 +37,7 @@ default AttributesBuilder getPopulatedSpanAttributesBuilder() {
// reverse the order so that the lowest attribute scopes will overwrite the upper ones if there were conflicts
var stack = new ArrayDeque<IScopedInstrumentationAttributes>();
while (currentObj != null) {
if (currentObj instanceof IScopedInstrumentationAttributes) {
stack.addFirst((IScopedInstrumentationAttributes) currentObj);
}
stack.addFirst((IScopedInstrumentationAttributes) currentObj);
currentObj = currentObj.getEnclosingScope();
}
var builder = stack.stream()
Expand All @@ -61,10 +61,11 @@ default DoubleHistogram getEndOfScopeDurationMetric() {
return getMetrics().contextDuration;
}

default void endSpan() {
default void endSpan(IContextTracker contextTracker) {
var span = getCurrentSpan();
span.setAllAttributes(getPopulatedSpanAttributes());
span.end();
contextTracker.onContextClosed(this);
}

default void sendMeterEventsForEnd() {
Expand All @@ -73,13 +74,13 @@ default void sendMeterEventsForEnd() {
}

default void close() {
endSpan();
endSpan(getContextTracker());
sendMeterEventsForEnd();
}

@Override
default void addException(Throwable e, boolean isPropagating) {
IWithStartTimeAndAttributes.super.addException(e, isPropagating);
default void addTraceException(Throwable e, boolean isPropagating) {
IWithStartTimeAndAttributes.super.addTraceException(e, isPropagating);
final var span = getCurrentSpan();
if (isPropagating) {
span.recordException(e, Attributes.of(SemanticAttributes.EXCEPTION_ESCAPED, true));
Expand Down Expand Up @@ -115,4 +116,20 @@ default void meterHistogram(LongHistogram histogram, long value, AttributesBuild
IWithStartTimeAndAttributes.super.meterHistogram(histogram, value, attributesBuilder);
}
}

default void addEvent(String eventName) {
getCurrentSpan().addEvent(eventName);
}

default void setTraceAttribute(AttributeKey<Long> attributeKey, long attributeValue) {
getCurrentSpan().setAttribute(attributeKey, attributeValue);
}

default void setAttribute(AttributeKey<String> attributeKey, String attributeValue) {
getCurrentSpan().setAttribute(attributeKey, attributeValue);
}

default void setAllAttributes(Attributes allAttributes) {
getCurrentSpan().setAllAttributes(allAttributes);
}
}
Loading