Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Universal profiling integration: Add stacktrace-IDs to transactions #3615

Merged
merged 10 commits into from
May 6, 2024
Next Next commit
Ported correlator
  • Loading branch information
JonasKunz committed May 2, 2024
commit 7b30a4edd365b7ea7ee0a837c91846d18ca370c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package co.elastic.apm.agent.universalprofiling;

public interface Clock {

Clock SYSTEM_NANOTIME = new Clock() {
@Override
public long getNanos() {
return System.nanoTime();
}
};

long getNanos();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package co.elastic.apm.agent.universalprofiling;

public interface MoveableEvent<SELF extends MoveableEvent<?>> {

/**
* Moves the content from this event into the provided other event. This event should be in a
* resetted state after the call.
*/
void moveInto(SELF other);

void clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package co.elastic.apm.agent.universalprofiling;

import com.lmax.disruptor.EventPoller;

import java.util.function.Supplier;

/**
* Wrapper around {@link EventPoller} which allows to "peek" elements. The provided event handling
* callback can decide to not handle an event. In that case, the event will be provided again as
* first element on the next call to {@link #poll(Handler)}.
*/
public class PeekingPoller<Event extends MoveableEvent<Event>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class and the tests are a copy of https://github.com/elastic/elastic-otel-java/blob/main/universal-profiling-integration/src/main/java/co/elastic/otel/disruptor/PeekingPoller.java.

The only changes made are removal of Java-8 feature usages.


public interface Handler<Event extends MoveableEvent<Event>> {

/**
* Handles an event fetched from the ring buffer.
*
* @return true, if the event was handled and shall be removed. False if the event was not
* handled, no further invocations of handleEvent are desired and the same event shall be
* provided for the next {@link PeekingPoller#poll(Handler)} call.
*/
boolean handleEvent(Event e);
}

private final EventPoller<Event> poller;
private final Event peekedEvent;
private boolean peekedEventPopulated;

Handler<? super Event> currentHandler;
private final EventPoller.Handler<Event> subHandler = this::handleEvent;

public PeekingPoller(EventPoller<Event> wrappedPoller, Supplier<Event> emptyEventFactory) {
this.poller = wrappedPoller;
peekedEvent = emptyEventFactory.get();
peekedEventPopulated = false;
}

public synchronized void poll(Handler<? super Event> handler) throws Exception {
if (peekedEventPopulated) {
boolean handled = handler.handleEvent(peekedEvent);
if (!handled) {
return;
}
peekedEvent.clear();
peekedEventPopulated = false;
}
currentHandler = handler;
try {
poller.poll(subHandler);
} finally {
currentHandler = null;
}
}

private boolean handleEvent(Event event, long sequence, boolean endOfBatch) {
boolean handled = currentHandler.handleEvent(event);
if (handled) {
return true;
} else {
peekedEventPopulated = true;
event.moveInto(peekedEvent);
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package co.elastic.apm.agent.universalprofiling;


import co.elastic.apm.agent.impl.transaction.Id;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.report.Reporter;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import com.lmax.disruptor.EventPoller;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import org.HdrHistogram.WriterReaderPhaser;

import java.util.concurrent.ConcurrentHashMap;

public class SpanProfilingSamplesCorrelator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is a port of https://github.com/elastic/elastic-otel-java/blob/main/universal-profiling-integration/src/main/java/co/elastic/otel/SpanProfilingSamplesCorrelator.java.

The logic and methods are broadly the same, just ported from Otel to our elastic-apm-agent datastructures.


private static final Logger logger = LoggerFactory.getLogger(SpanProfilingSamplesCorrelator.class);

/**
* Holds the currently active transactions by their span-ID.
* Note that theoretically there could be a collision by having two transactions with different trace-IDs but the same span-ID.
* However, this is highly unlikely (see <a href="https://en.wikipedia.org/wiki/Birthday_problem">birthday problem</a>) and even if
* it were to happen, the only consequences would be a potentially incorrect correlation for the two colliding transactions.
*/
private final ConcurrentHashMap<Id, Transaction> transactionsById = new ConcurrentHashMap<>();

private final Reporter reporter;

// Clock to use, can be swapped out for testing
Clock nanoClock = Clock.SYSTEM_NANOTIME;

// Visible for testing
final RingBuffer<BufferedTransaction> delayedSpans;
private final PeekingPoller<BufferedTransaction> delayedSpansPoller;

private volatile long spanBufferDurationNanos;
private volatile boolean shuttingDown = false;

private final WriterReaderPhaser shutdownPhaser = new WriterReaderPhaser();

public SpanProfilingSamplesCorrelator(
int bufferCapacity,
long initialSpanDelayNanos,
Reporter reporter) {
this.spanBufferDurationNanos = initialSpanDelayNanos;
this.reporter = reporter;

bufferCapacity = nextPowerOf2(bufferCapacity);
// We use a wait strategy which doesn't involve signaling via condition variables
// because we never block anyway (we use polling)
delayedSpans =
RingBuffer.createMultiProducer(
BufferedTransaction::new, bufferCapacity, new YieldingWaitStrategy());
EventPoller<BufferedTransaction> nonPeekingPoller = delayedSpans.newPoller();
delayedSpans.addGatingSequences(nonPeekingPoller.getSequence());

delayedSpansPoller = new PeekingPoller<>(nonPeekingPoller, BufferedTransaction::new);
}

public void setSpanBufferDurationNanos(long nanos) {
if (nanos < 0) {
throw new IllegalArgumentException("nanos must be positive but was " + nanos);
}
spanBufferDurationNanos = nanos;
}

public void onTransactionStart(Transaction transaction) {
if (transaction.isSampled()) {
transactionsById.put(transaction.getTraceContext().getId(), transaction);
}
}

public void reportOrBufferTransaction(Transaction transaction) {
if (transactionsById.remove(transaction.getTraceContext().getId()) == null) {
// transaction is not being correlated, e.g. because it was not sampled
// therefore no need to buffer it
reporter.report(transaction);
return;
}

long criticalPhaseVal = shutdownPhaser.writerCriticalSectionEnter();
try {
if (spanBufferDurationNanos == 0 || shuttingDown) {
reporter.report(transaction);
return;
}

boolean couldPublish =
delayedSpans.tryPublishEvent(BufferedTransaction.TRANSLATOR,
transaction,
nanoClock.getNanos());

if (!couldPublish) {
logger.warn("The following transaction could not be delayed for correlation due to a full buffer, it will be sent immediately, {0}",
transaction);
reporter.report(transaction);
}
} finally {
shutdownPhaser.writerCriticalSectionExit(criticalPhaseVal);
}
}

public synchronized void correlate(
Id traceId, Id transactionId, Id stackTraceId, int count) {
Transaction tx = transactionsById.get(transactionId);
if (tx != null) {
// this branch should be true practically always unless there was a collision in transactionsById
// nonetheless for the unlikely case that it happens, we at least prevent wrongly adding data to another transaction
if (tx.getTraceContext().getTraceId().equals(traceId)) {
for (int i = 0; i < count; i++) {
tx.addProfilerCorrelationStackTrace(stackTraceId);
}
}
}
}

public synchronized void flushPendingBufferedSpans() {
try {
delayedSpansPoller.poll(
bufferedSpan -> {
long elapsed = nanoClock.getNanos() - bufferedSpan.endNanoTimestamp;
if (elapsed >= spanBufferDurationNanos || shuttingDown) {
reporter.report(bufferedSpan.transaction);
bufferedSpan.clear();
return true;
}
return false; // span is not yet ready to be sent
});
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

public synchronized void shutdownAndFlushAll() {
shutdownPhaser.readerLock();
try {
shuttingDown = true; // This will cause new ended spans to not be buffered anymore

// avoid race condition: we wait until we are
// sure that no more spans will be added to the ringbuffer
shutdownPhaser.flipPhase();
} finally {
shutdownPhaser.readerUnlock();
}
// This will flush all pending spans because shuttingDown=true
flushPendingBufferedSpans();
}

private static class BufferedTransaction implements MoveableEvent<BufferedTransaction> {

Transaction transaction;
long endNanoTimestamp;

@Override
public void moveInto(BufferedTransaction other) {
other.transaction = transaction;
other.endNanoTimestamp = endNanoTimestamp;
clear();
}

@Override
public void clear() {
transaction = null;
endNanoTimestamp = -1;
}

public static final EventTranslatorTwoArg<BufferedTransaction, Transaction, Long> TRANSLATOR = new EventTranslatorTwoArg<BufferedTransaction, Transaction, Long>() {
@Override
public void translateTo(BufferedTransaction event, long sequence, Transaction transaction, Long timestamp) {
event.transaction = transaction;
event.endNanoTimestamp = timestamp;
}
};
}

private static int nextPowerOf2(int val) {
int result = 2;
while (result < val) {
result *= 2;
}
return result;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package co.elastic.apm.agent.universalprofiling;

import static org.assertj.core.api.Assertions.assertThat;

import com.lmax.disruptor.EventPoller;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import org.junit.jupiter.api.Test;

public class PeekingPollerTest {

private static class DummyEvent implements MoveableEvent<DummyEvent> {

private int val;

public DummyEvent() {
this(-1);
}

public DummyEvent(int val) {
this.val = val;
}

@Override
public void moveInto(DummyEvent other) {
other.val = val;
clear();
}

@Override
public void clear() {
val = -1;
}
}

private static class RecordingHandler implements PeekingPoller.Handler<DummyEvent> {

List<Integer> invocations = new ArrayList<>();
Function<Integer, Boolean> resultProvider;

@Override
public boolean handleEvent(DummyEvent event) {
invocations.add(event.val);
return resultProvider.apply(event.val);
}

void reset() {
invocations.clear();
}
}

@Test
public void testPeekingFunction() throws Exception {
RingBuffer<DummyEvent> rb =
RingBuffer.createMultiProducer(DummyEvent::new, 4, new YieldingWaitStrategy());
EventPoller<DummyEvent> nonPeekingPoller = rb.newPoller();
rb.addGatingSequences(nonPeekingPoller.getSequence());

PeekingPoller<DummyEvent> poller =
new PeekingPoller<DummyEvent>(nonPeekingPoller, DummyEvent::new);

RecordingHandler handler = new RecordingHandler();
handler.resultProvider = val -> false;
poller.poll(handler);
assertThat(handler.invocations).isEmpty();

assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 1)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 2)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 3)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 4)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 5)).isFalse();

poller.poll(handler);
assertThat(handler.invocations).containsExactly(1);
poller.poll(handler);
assertThat(handler.invocations).containsExactly(1, 1);

// It should now be possible to add one more element to the buffer
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 5)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 6)).isFalse();

poller.poll(handler);
assertThat(handler.invocations).containsExactly(1, 1, 1);

// now consume elements up to index 2
handler.reset();
handler.resultProvider = i -> i != 3;

poller.poll(handler);
assertThat(handler.invocations).containsExactly(1, 2, 3);

// It should now be possible to add two more element to the buffer
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 6)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 7)).isTrue();
assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 8)).isFalse();

poller.poll(handler);
assertThat(handler.invocations).containsExactly(1, 2, 3, 3);

// drain remaining elements
handler.reset();
handler.resultProvider = i -> true;

poller.poll(handler);
assertThat(handler.invocations).containsExactly(3, 4, 5, 6, 7);
}
}