From 7b30a4edd365b7ea7ee0a837c91846d18ca370c3 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Mon, 29 Apr 2024 15:17:05 +0200 Subject: [PATCH 01/10] Ported correlator --- .../apm/agent/universalprofiling/Clock.java | 13 ++ .../universalprofiling/MoveableEvent.java | 12 ++ .../universalprofiling/PeekingPoller.java | 66 +++++++ .../SpanProfilingSamplesCorrelator.java | 186 ++++++++++++++++++ .../universalprofiling/PeekingPollerTest.java | 112 +++++++++++ 5 files changed, 389 insertions(+) create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java create mode 100644 apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java new file mode 100644 index 0000000000..a4a0973af6 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java @@ -0,0 +1,13 @@ +package co.elastic.apm.agent.universalprofiling; + +public interface Clock { + + Clock SYSTEM_NANOTIME = new Clock() { + @Override + public long getNanos() { + return System.nanoTime(); + } + }; + + long getNanos(); +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java new file mode 100644 index 0000000000..c4e33f75b7 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java @@ -0,0 +1,12 @@ +package co.elastic.apm.agent.universalprofiling; + +public interface MoveableEvent> { + + /** + * Moves the content from this event into the provided other event. This event should be in a + * resetted state after the call. + */ + void moveInto(SELF other); + + void clear(); +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java new file mode 100644 index 0000000000..95364cff42 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java @@ -0,0 +1,66 @@ +package co.elastic.apm.agent.universalprofiling; + +import com.lmax.disruptor.EventPoller; + +import java.util.function.Supplier; + +/** + * Wrapper around {@link EventPoller} which allows to "peek" elements. The provided event handling + * callback can decide to not handle an event. In that case, the event will be provided again as + * first element on the next call to {@link #poll(Handler)}. + */ +public class PeekingPoller> { + + public interface Handler> { + + /** + * Handles an event fetched from the ring buffer. + * + * @return true, if the event was handled and shall be removed. False if the event was not + * handled, no further invocations of handleEvent are desired and the same event shall be + * provided for the next {@link PeekingPoller#poll(Handler)} call. + */ + boolean handleEvent(Event e); + } + + private final EventPoller poller; + private final Event peekedEvent; + private boolean peekedEventPopulated; + + Handler currentHandler; + private final EventPoller.Handler subHandler = this::handleEvent; + + public PeekingPoller(EventPoller wrappedPoller, Supplier emptyEventFactory) { + this.poller = wrappedPoller; + peekedEvent = emptyEventFactory.get(); + peekedEventPopulated = false; + } + + public synchronized void poll(Handler handler) throws Exception { + if (peekedEventPopulated) { + boolean handled = handler.handleEvent(peekedEvent); + if (!handled) { + return; + } + peekedEvent.clear(); + peekedEventPopulated = false; + } + currentHandler = handler; + try { + poller.poll(subHandler); + } finally { + currentHandler = null; + } + } + + private boolean handleEvent(Event event, long sequence, boolean endOfBatch) { + boolean handled = currentHandler.handleEvent(event); + if (handled) { + return true; + } else { + peekedEventPopulated = true; + event.moveInto(peekedEvent); + return false; + } + } +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java new file mode 100644 index 0000000000..cff247b4ef --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java @@ -0,0 +1,186 @@ +package co.elastic.apm.agent.universalprofiling; + + +import co.elastic.apm.agent.impl.transaction.Id; +import co.elastic.apm.agent.impl.transaction.Transaction; +import co.elastic.apm.agent.report.Reporter; +import co.elastic.apm.agent.sdk.logging.Logger; +import co.elastic.apm.agent.sdk.logging.LoggerFactory; +import com.lmax.disruptor.EventPoller; +import com.lmax.disruptor.EventTranslatorTwoArg; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; +import org.HdrHistogram.WriterReaderPhaser; + +import java.util.concurrent.ConcurrentHashMap; + +public class SpanProfilingSamplesCorrelator { + + private static final Logger logger = LoggerFactory.getLogger(SpanProfilingSamplesCorrelator.class); + + /** + * Holds the currently active transactions by their span-ID. + * Note that theoretically there could be a collision by having two transactions with different trace-IDs but the same span-ID. + * However, this is highly unlikely (see birthday problem) and even if + * it were to happen, the only consequences would be a potentially incorrect correlation for the two colliding transactions. + */ + private final ConcurrentHashMap transactionsById = new ConcurrentHashMap<>(); + + private final Reporter reporter; + + // Clock to use, can be swapped out for testing + Clock nanoClock = Clock.SYSTEM_NANOTIME; + + // Visible for testing + final RingBuffer delayedSpans; + private final PeekingPoller delayedSpansPoller; + + private volatile long spanBufferDurationNanos; + private volatile boolean shuttingDown = false; + + private final WriterReaderPhaser shutdownPhaser = new WriterReaderPhaser(); + + public SpanProfilingSamplesCorrelator( + int bufferCapacity, + long initialSpanDelayNanos, + Reporter reporter) { + this.spanBufferDurationNanos = initialSpanDelayNanos; + this.reporter = reporter; + + bufferCapacity = nextPowerOf2(bufferCapacity); + // We use a wait strategy which doesn't involve signaling via condition variables + // because we never block anyway (we use polling) + delayedSpans = + RingBuffer.createMultiProducer( + BufferedTransaction::new, bufferCapacity, new YieldingWaitStrategy()); + EventPoller nonPeekingPoller = delayedSpans.newPoller(); + delayedSpans.addGatingSequences(nonPeekingPoller.getSequence()); + + delayedSpansPoller = new PeekingPoller<>(nonPeekingPoller, BufferedTransaction::new); + } + + public void setSpanBufferDurationNanos(long nanos) { + if (nanos < 0) { + throw new IllegalArgumentException("nanos must be positive but was " + nanos); + } + spanBufferDurationNanos = nanos; + } + + public void onTransactionStart(Transaction transaction) { + if (transaction.isSampled()) { + transactionsById.put(transaction.getTraceContext().getId(), transaction); + } + } + + public void reportOrBufferTransaction(Transaction transaction) { + if (transactionsById.remove(transaction.getTraceContext().getId()) == null) { + // transaction is not being correlated, e.g. because it was not sampled + // therefore no need to buffer it + reporter.report(transaction); + return; + } + + long criticalPhaseVal = shutdownPhaser.writerCriticalSectionEnter(); + try { + if (spanBufferDurationNanos == 0 || shuttingDown) { + reporter.report(transaction); + return; + } + + boolean couldPublish = + delayedSpans.tryPublishEvent(BufferedTransaction.TRANSLATOR, + transaction, + nanoClock.getNanos()); + + if (!couldPublish) { + logger.warn("The following transaction could not be delayed for correlation due to a full buffer, it will be sent immediately, {0}", + transaction); + reporter.report(transaction); + } + } finally { + shutdownPhaser.writerCriticalSectionExit(criticalPhaseVal); + } + } + + public synchronized void correlate( + Id traceId, Id transactionId, Id stackTraceId, int count) { + Transaction tx = transactionsById.get(transactionId); + if (tx != null) { + // this branch should be true practically always unless there was a collision in transactionsById + // nonetheless for the unlikely case that it happens, we at least prevent wrongly adding data to another transaction + if (tx.getTraceContext().getTraceId().equals(traceId)) { + for (int i = 0; i < count; i++) { + tx.addProfilerCorrelationStackTrace(stackTraceId); + } + } + } + } + + public synchronized void flushPendingBufferedSpans() { + try { + delayedSpansPoller.poll( + bufferedSpan -> { + long elapsed = nanoClock.getNanos() - bufferedSpan.endNanoTimestamp; + if (elapsed >= spanBufferDurationNanos || shuttingDown) { + reporter.report(bufferedSpan.transaction); + bufferedSpan.clear(); + return true; + } + return false; // span is not yet ready to be sent + }); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + public synchronized void shutdownAndFlushAll() { + shutdownPhaser.readerLock(); + try { + shuttingDown = true; // This will cause new ended spans to not be buffered anymore + + // avoid race condition: we wait until we are + // sure that no more spans will be added to the ringbuffer + shutdownPhaser.flipPhase(); + } finally { + shutdownPhaser.readerUnlock(); + } + // This will flush all pending spans because shuttingDown=true + flushPendingBufferedSpans(); + } + + private static class BufferedTransaction implements MoveableEvent { + + Transaction transaction; + long endNanoTimestamp; + + @Override + public void moveInto(BufferedTransaction other) { + other.transaction = transaction; + other.endNanoTimestamp = endNanoTimestamp; + clear(); + } + + @Override + public void clear() { + transaction = null; + endNanoTimestamp = -1; + } + + public static final EventTranslatorTwoArg TRANSLATOR = new EventTranslatorTwoArg() { + @Override + public void translateTo(BufferedTransaction event, long sequence, Transaction transaction, Long timestamp) { + event.transaction = transaction; + event.endNanoTimestamp = timestamp; + } + }; + } + + private static int nextPowerOf2(int val) { + int result = 2; + while (result < val) { + result *= 2; + } + return result; + } +} + diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java new file mode 100644 index 0000000000..009d6e0877 --- /dev/null +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java @@ -0,0 +1,112 @@ +package co.elastic.apm.agent.universalprofiling; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.lmax.disruptor.EventPoller; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import org.junit.jupiter.api.Test; + +public class PeekingPollerTest { + + private static class DummyEvent implements MoveableEvent { + + private int val; + + public DummyEvent() { + this(-1); + } + + public DummyEvent(int val) { + this.val = val; + } + + @Override + public void moveInto(DummyEvent other) { + other.val = val; + clear(); + } + + @Override + public void clear() { + val = -1; + } + } + + private static class RecordingHandler implements PeekingPoller.Handler { + + List invocations = new ArrayList<>(); + Function resultProvider; + + @Override + public boolean handleEvent(DummyEvent event) { + invocations.add(event.val); + return resultProvider.apply(event.val); + } + + void reset() { + invocations.clear(); + } + } + + @Test + public void testPeekingFunction() throws Exception { + RingBuffer rb = + RingBuffer.createMultiProducer(DummyEvent::new, 4, new YieldingWaitStrategy()); + EventPoller nonPeekingPoller = rb.newPoller(); + rb.addGatingSequences(nonPeekingPoller.getSequence()); + + PeekingPoller poller = + new PeekingPoller(nonPeekingPoller, DummyEvent::new); + + RecordingHandler handler = new RecordingHandler(); + handler.resultProvider = val -> false; + poller.poll(handler); + assertThat(handler.invocations).isEmpty(); + + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 1)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 2)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 3)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 4)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 5)).isFalse(); + + poller.poll(handler); + assertThat(handler.invocations).containsExactly(1); + poller.poll(handler); + assertThat(handler.invocations).containsExactly(1, 1); + + // It should now be possible to add one more element to the buffer + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 5)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 6)).isFalse(); + + poller.poll(handler); + assertThat(handler.invocations).containsExactly(1, 1, 1); + + // now consume elements up to index 2 + handler.reset(); + handler.resultProvider = i -> i != 3; + + poller.poll(handler); + assertThat(handler.invocations).containsExactly(1, 2, 3); + + // It should now be possible to add two more element to the buffer + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 6)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 7)).isTrue(); + assertThat(rb.tryPublishEvent((ev, srq) -> ev.val = 8)).isFalse(); + + poller.poll(handler); + assertThat(handler.invocations).containsExactly(1, 2, 3, 3); + + // drain remaining elements + handler.reset(); + handler.resultProvider = i -> true; + + poller.poll(handler); + assertThat(handler.invocations).containsExactly(3, 4, 5, 6, 7); + } +} From d1b257d9458be2a1b2b354392da87d06f794a4df Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 30 Apr 2024 14:30:59 +0200 Subject: [PATCH 02/10] Implemented correlation --- .../UniversalProfilingConfiguration.java | 8 ++-- .../SpanProfilingSamplesCorrelator.java | 4 ++ .../UniversalProfilingIntegration.java | 42 ++++++++++++++----- 3 files changed, 40 insertions(+), 14 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java index 98ca6b0cb5..be318fe5a1 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java @@ -34,9 +34,9 @@ public class UniversalProfilingConfiguration extends ConfigurationOptionProvider .description("If enabled, the apm agent will correlate it's transaction with the profiling data from elastic universal profiling running on the same host.") .buildWithDefault(false); - private final ConfigurationOption bufferSize = ConfigurationOption.longOption() + private final ConfigurationOption bufferSize = ConfigurationOption.integerOption() .key("universal_profiling_integration_buffer_size") - .addValidator(isInRange(64L, Long.MAX_VALUE)) + .addValidator(isInRange(64, Integer.MAX_VALUE)) .tags("added[1.50.0]", "internal") .configurationCategory(PROFILING_CATEGORY) .description("The feature needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received." + @@ -44,7 +44,7 @@ public class UniversalProfilingConfiguration extends ConfigurationOptionProvider "The higher the number of local root spans per second, the higher this buffer size should be set.\n" + "The agent will log a warning if it is not capable of buffering a span due to insufficient buffer size. " + "This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data.") - .buildWithDefault(4096L); + .buildWithDefault(4096); private final ConfigurationOption socketDir = ConfigurationOption.stringOption() .key("universal_profiling_integration_socket_dir") @@ -60,7 +60,7 @@ public boolean isEnabled() { return enabled.get(); } - public long getBufferSize() { + public int getBufferSize() { return bufferSize.get(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java index cff247b4ef..392516e024 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java @@ -72,6 +72,10 @@ public void onTransactionStart(Transaction transaction) { } } + public void dropTransaction(Transaction transaction) { + transactionsById.remove(transaction.getTraceContext().getId()); + } + public void reportOrBufferTransaction(Transaction transaction) { if (transactionsById.remove(transaction.getTraceContext().getId()) == null) { // transaction is not being correlated, e.g. because it was not sampled diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index 33897cbfa6..65f2ee015a 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -25,6 +25,7 @@ import co.elastic.apm.agent.impl.metadata.SystemInfo; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.ElasticContext; +import co.elastic.apm.agent.impl.transaction.Id; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.sdk.logging.Logger; import co.elastic.apm.agent.sdk.logging.LoggerFactory; @@ -42,11 +43,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; -import java.util.Base64; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; public class UniversalProfilingIntegration { @@ -56,6 +55,8 @@ public class UniversalProfilingIntegration { */ static final long POLL_FREQUENCY_MS = 20; + private static final long INITIAL_SPAN_DELAY_NANOS = Duration.ofSeconds(1).toNanos(); + private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class); private volatile ElasticApmTracer tracer; @@ -66,8 +67,12 @@ public class UniversalProfilingIntegration { // Visible for testing String socketPath = null; + @Nullable private ScheduledExecutorService executor; + @Nullable + private SpanProfilingSamplesCorrelator correlator; + private ActivationListener activationListener = new ActivationListener() { @Override @@ -102,6 +107,8 @@ public void start(ElasticApmTracer tracer) { coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath); UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage); + correlator = new SpanProfilingSamplesCorrelator(config.getBufferSize(), INITIAL_SPAN_DELAY_NANOS, tracer.getReporter()); + executor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("profiling-integration"); executor.scheduleWithFixedDelay(new Runnable() { @Override @@ -125,8 +132,10 @@ public void run() { } } - private void periodicTimer() { + // Visible for testing + void periodicTimer() { consumeProfilerMessages(); + correlator.flushPendingBufferedSpans(); } public void stop() { @@ -137,6 +146,8 @@ public void stop() { executor = null; } if (isActive) { + consumeProfilerMessages(); + correlator.shutdownAndFlushAll(); UniversalProfilingCorrelation.stopProfilerReturnChannel(); JvmtiAccess.destroy(); isActive = false; @@ -147,7 +158,7 @@ public void stop() { } public void afterTransactionStart(Transaction startedTransaction) { - //TODO: store the transaction in a map for correlating with profiling data + correlator.onTransactionStart(startedTransaction); } /** @@ -161,12 +172,11 @@ public void afterTransactionStart(Transaction startedTransaction) { * @param endedTransaction the transaction to be reported */ public void correlateAndReport(Transaction endedTransaction) { - //TODO: perform correlation and report after buffering for a certain delay - tracer.getReporter().report(endedTransaction); + correlator.reportOrBufferTransaction(endedTransaction); } public void drop(Transaction endedTransaction) { - //TODO: remove dropped transactions from correlation storage without reporting + correlator.dropTransaction(endedTransaction); } @@ -197,7 +207,7 @@ private String randomSocketFileName() { return name.toString(); } - private void consumeProfilerMessages() { + private synchronized void consumeProfilerMessages() { try { while (true) { try { @@ -223,10 +233,22 @@ private void consumeProfilerMessages() { } private void handleMessage(ProfilerRegistrationMessage message) { - //TODO: handle message + //TODO: update the host.id in the reporter metadata + log.debug("Received profiler registration message with host.id={} and expected latency of {} millis", + message.getHostId(), message.getSamplesDelayMillis()); + long delayMillis = message.getSamplesDelayMillis() + POLL_FREQUENCY_MS; + correlator.setSpanBufferDurationNanos(delayMillis * 1_000_000L); } + private final Id tempTraceId = Id.new128BitId(); + private final Id tempSpanId = Id.new64BitId(); + private final Id tempStackTraceId = Id.new128BitId(); private void handleMessage(TraceCorrelationMessage message) { - //TODO: handle message + tempTraceId.fromBytes(message.getTraceId(), 0); + tempSpanId.fromBytes(message.getLocalRootSpanId(), 0); + tempStackTraceId.fromBytes(message.getStackTraceId(), 0); + log.trace("Received profiler correlation message with trace.id={} transaction.id={} stacktrace.id={} count={}", + tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount()); + correlator.correlate(tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount()); } } From e8c41e109d00acc4914ac64f8e2ad60febd9fb46 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Tue, 30 Apr 2024 15:17:40 +0200 Subject: [PATCH 03/10] Added tests and fixed uncovered bugs --- .../apm/agent/impl/ElasticApmTracer.java | 4 + .../SpanProfilingSamplesCorrelator.java | 5 +- .../UniversalProfilingIntegration.java | 7 +- .../UniversalProfilingIntegrationTest.java | 182 +++++++++++++++++- 4 files changed, 186 insertions(+), 12 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java index 025de06219..d50ba17470 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java @@ -673,6 +673,10 @@ public synchronized void stop() { public Reporter getReporter() { return reporter; } + + public UniversalProfilingIntegration getProfilingIntegration() { + return profilingIntegration; + } public Sampler getSampler() { return sampler; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java index 392516e024..cf2db89939 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java @@ -72,12 +72,12 @@ public void onTransactionStart(Transaction transaction) { } } - public void dropTransaction(Transaction transaction) { + public void stopCorrelating(Transaction transaction) { transactionsById.remove(transaction.getTraceContext().getId()); } public void reportOrBufferTransaction(Transaction transaction) { - if (transactionsById.remove(transaction.getTraceContext().getId()) == null) { + if (!transactionsById.containsKey(transaction.getTraceContext().getId())) { // transaction is not being correlated, e.g. because it was not sampled // therefore no need to buffer it reporter.report(transaction); @@ -126,6 +126,7 @@ public synchronized void flushPendingBufferedSpans() { bufferedSpan -> { long elapsed = nanoClock.getNanos() - bufferedSpan.endNanoTimestamp; if (elapsed >= spanBufferDurationNanos || shuttingDown) { + stopCorrelating(bufferedSpan.transaction); reporter.report(bufferedSpan.transaction); bufferedSpan.clear(); return true; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index 65f2ee015a..6824d03498 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -70,8 +70,9 @@ public class UniversalProfilingIntegration { @Nullable private ScheduledExecutorService executor; + // Visible for testing @Nullable - private SpanProfilingSamplesCorrelator correlator; + SpanProfilingSamplesCorrelator correlator; private ActivationListener activationListener = new ActivationListener() { @@ -176,7 +177,7 @@ public void correlateAndReport(Transaction endedTransaction) { } public void drop(Transaction endedTransaction) { - correlator.dropTransaction(endedTransaction); + correlator.stopCorrelating(endedTransaction); } @@ -247,7 +248,7 @@ private void handleMessage(TraceCorrelationMessage message) { tempTraceId.fromBytes(message.getTraceId(), 0); tempSpanId.fromBytes(message.getLocalRootSpanId(), 0); tempStackTraceId.fromBytes(message.getStackTraceId(), 0); - log.trace("Received profiler correlation message with trace.id={} transaction.id={} stacktrace.id={} count={}", + log.debug("Received profiler correlation message with trace.id={} transaction.id={} stacktrace.id={} count={}", tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount()); correlator.correlate(tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount()); } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java index 81b64c67cb..3152531626 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegrationTest.java @@ -24,7 +24,7 @@ import co.elastic.apm.agent.configuration.SpyConfiguration; import co.elastic.apm.agent.configuration.UniversalProfilingConfiguration; import co.elastic.apm.agent.impl.ElasticApmTracer; -import co.elastic.apm.agent.impl.Tracer; +import co.elastic.apm.agent.impl.sampling.ConstantSampler; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Id; import co.elastic.apm.agent.impl.transaction.Span; @@ -50,10 +50,14 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat; import static co.elastic.apm.agent.universalprofiling.ProfilerSharedMemoryWriter.TLS_STORAGE_SIZE; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; @@ -61,7 +65,7 @@ public class UniversalProfilingIntegrationTest { - private Tracer tracer; + private ElasticApmTracer tracer; private MockReporter reporter; private TestObjectPoolFactory poolFactory; @@ -79,7 +83,7 @@ void setupTracer() { void setupTracer(Consumer configCustomizer) { ConfigurationRegistry config = SpyConfiguration.createSpyConfig(); configCustomizer.accept(config); - MockTracer.MockInstrumentationSetup mockInstrumentationSetup = MockTracer.createMockInstrumentationSetup(config); + MockTracer.MockInstrumentationSetup mockInstrumentationSetup = MockTracer.createMockInstrumentationSetup(config, false); tracer = mockInstrumentationSetup.getTracer(); reporter = mockInstrumentationSetup.getReporter(); @@ -89,9 +93,11 @@ void setupTracer(Consumer configCustomizer) { @AfterEach public void cleanupTracer() { if (tracer != null) { + if (tracer.isRunning()) { + tracer.stop(); + } reporter.assertRecycledAfterDecrementingReferences(); poolFactory.checkAllPooledObjectsHaveBeenRecycled(); - tracer.stop(); tracer = null; } } @@ -174,8 +180,6 @@ public void testNestedActivations() { first.end(); second.end(); third.end(); - assertThat(reporter.getTransactions()).containsExactly(first, second); - assertThat(reporter.getSpans()).containsExactly(third); } @@ -242,6 +246,133 @@ static void checkTlsIs(@Nullable AbstractSpan span) { @Nested class SpanCorrelation { + + @Test + void checkCorrelationFunctional() { + AtomicLong clockMs = new AtomicLong(0L); + setupTracer(); + UniversalProfilingIntegration profilingIntegration = tracer.getProfilingIntegration(); + profilingIntegration.correlator.nanoClock = () -> clockMs.get() * 1_000_000L; + + sendProfilerRegistrationMsg(1, "hostid"); + + Transaction tx1 = tracer.startRootTransaction(null); + Transaction tx2 = tracer.startRootTransaction(null); + + Id st1 = randomStackTraceId(1); + sendSampleMsg(tx1, st1, 1); + + // Send some garbage which should not affect our processing + JvmtiAccessImpl.sendToProfilerReturnChannelSocket0(new byte[]{1, 2, 3}); + + Id st2 = randomStackTraceId(2); + sendSampleMsg(tx2, st2, 2); + + // ensure that the messages are processed now + profilingIntegration.periodicTimer(); + + tx1.end(); + tx2.end(); + + // ensure that spans are not sent, their delay has not yet elapsed + assertThat(reporter.getTransactions()).isEmpty(); + + Id st3 = randomStackTraceId(3); + sendSampleMsg(tx2, st2, 1); + sendSampleMsg(tx1, st3, 2); + sendSampleMsg(tx2, st3, 1); + + clockMs.set(1L + UniversalProfilingIntegration.POLL_FREQUENCY_MS); + // now the background thread should consume those messages and flush the spans + await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted( + () -> { + + assertThat(reporter.getTransactions()) + .hasSize(2) + .containsExactlyInAnyOrder(tx1, tx2); + + assertThat(tx1.getProfilingCorrelationStackTraceIds()) + .containsExactlyInAnyOrder(st1, st3, st3); + + assertThat(tx2.getProfilingCorrelationStackTraceIds()) + .containsExactlyInAnyOrder(st2, st2, st2, st3); + }); + } + + @Test + void unsampledTransactionsNotCorrelated() { + setupTracer(); + UniversalProfilingIntegration profilingIntegration = tracer.getProfilingIntegration(); + profilingIntegration.correlator.nanoClock = () -> 0L; + doReturn(true).when(tracer.getApmServerClient()).supportsKeepingUnsampledTransaction(); + + sendProfilerRegistrationMsg(1, "hostid"); + + Transaction tx = tracer.startRootTransaction(ConstantSampler.of(false), 0L, null); + assertThat(tx.isSampled()).isFalse(); + + // Still send a stacktrace to make sure it is actually ignored + sendSampleMsg(tx, randomStackTraceId(1), 1); + + // ensure that the messages are processed now + profilingIntegration.periodicTimer(); + tx.end(); + + assertThat(reporter.getTransactions()).containsExactly(tx); + assertThat(tx.getProfilingCorrelationStackTraceIds()).isEmpty(); + } + + @Test + void shutdownFlushesBufferedSpans() { + Id st1 = randomStackTraceId(1); + + setupTracer(); + UniversalProfilingIntegration profilingIntegration = tracer.getProfilingIntegration(); + profilingIntegration.correlator.nanoClock = () -> 0L; + + sendProfilerRegistrationMsg(1, "hostid"); + profilingIntegration.periodicTimer(); + + Transaction tx = tracer.startRootTransaction(null); + tx.end(); + + profilingIntegration.periodicTimer(); + assertThat(reporter.getTransactions()).isEmpty(); + + sendSampleMsg(tx, st1, 1); + tracer.stop(); + + assertThat(reporter.getTransactions()).containsExactly(tx); + assertThat(tx.getProfilingCorrelationStackTraceIds()).containsExactly(st1); + } + + @Test + void bufferCapacityExceeded() { + setupTracer(conf -> { + UniversalProfilingConfiguration profConfig = conf.getConfig(UniversalProfilingConfiguration.class); + doReturn(true).when(profConfig).isEnabled(); + doReturn(tempDir.toAbsolutePath().toString()).when(profConfig).getSocketDir(); + doReturn(2).when(profConfig).getBufferSize(); + }); + UniversalProfilingIntegration profilingIntegration = tracer.getProfilingIntegration(); + profilingIntegration.correlator.nanoClock = () -> 0L; + + + sendProfilerRegistrationMsg(1, "hostid"); + profilingIntegration.periodicTimer(); + Transaction tx1 = tracer.startRootTransaction(null); + tx1.end(); + Transaction tx2 = tracer.startRootTransaction(null); + tx2.end(); + // now the buffer should be full, transaction 3 should be sent immediately + Transaction tx3 = tracer.startRootTransaction(null); + tx3.end(); + + Assertions.assertThat(reporter.getTransactions()).containsExactly(tx3); + } + @Test void badSocketPath() throws Exception { Path notADir = tempDir.resolve("not_a_dir"); @@ -287,9 +418,46 @@ void socketParentDirCreated() throws Exception { } } - } + private Id randomStackTraceId(int seed) { + byte[] id = new byte[16]; + new Random(seed).nextBytes(id); + Id idObj = Id.new128BitId(); + idObj.fromBytes(id, 0); + return idObj; + } + + void sendSampleMsg(Transaction transaction, Id stackTraceId, int count) { + byte[] traceId = idToBytes(transaction.getTraceContext().getTraceId()); + byte[] transactionId = idToBytes(transaction.getTraceContext().getId()); + ByteBuffer message = ByteBuffer.allocate(46); + message.order(ByteOrder.nativeOrder()); + message.putShort((short) 1); // message-type = correlation message + message.putShort((short) 1); // message-version + message.put(traceId); + message.put(transactionId); + message.put(idToBytes(stackTraceId)); + message.putShort((short) count); + + JvmtiAccessImpl.sendToProfilerReturnChannelSocket0(message.array()); + } + + void sendProfilerRegistrationMsg(int sampleDelayMillis, String hostId) { + byte[] hostIdUtf8 = hostId.getBytes(StandardCharsets.UTF_8); + + ByteBuffer message = ByteBuffer.allocate(12 + hostIdUtf8.length); + message.order(ByteOrder.nativeOrder()); + message.putShort((short) 2); // message-type = registration message + message.putShort((short) 1); // message-version + message.putInt(sampleDelayMillis); + message.putInt(hostIdUtf8.length); + message.put(hostIdUtf8); + + JvmtiAccessImpl.sendToProfilerReturnChannelSocket0(message.array()); + } + } + private static byte[] idToBytes(Id id) { byte[] buff = new byte[32]; int len = id.toBytes(buff, 0); From 701af9f921c5845631fb28419971bd5734a9b491 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Thu, 2 May 2024 09:57:42 +0200 Subject: [PATCH 04/10] Remove lambda --- .../SpanProfilingSamplesCorrelator.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java index cf2db89939..3ea59bcde7 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java @@ -120,19 +120,23 @@ public synchronized void correlate( } } + private final PeekingPoller.Handler BUFFERED_TRANSACTION_HANDLER = new PeekingPoller.Handler() { + @Override + public boolean handleEvent(BufferedTransaction bufferedSpan) { + long elapsed = nanoClock.getNanos() - bufferedSpan.endNanoTimestamp; + if (elapsed >= spanBufferDurationNanos || shuttingDown) { + stopCorrelating(bufferedSpan.transaction); + reporter.report(bufferedSpan.transaction); + bufferedSpan.clear(); + return true; + } + return false; // span is not yet ready to be sent + } + }; + public synchronized void flushPendingBufferedSpans() { try { - delayedSpansPoller.poll( - bufferedSpan -> { - long elapsed = nanoClock.getNanos() - bufferedSpan.endNanoTimestamp; - if (elapsed >= spanBufferDurationNanos || shuttingDown) { - stopCorrelating(bufferedSpan.transaction); - reporter.report(bufferedSpan.transaction); - bufferedSpan.clear(); - return true; - } - return false; // span is not yet ready to be sent - }); + delayedSpansPoller.poll(BUFFERED_TRANSACTION_HANDLER); } catch (Exception e) { throw new IllegalStateException(e); } From b2bdeac353f53d9e8da40a06810d6b47e26debf0 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 3 May 2024 12:22:15 +0200 Subject: [PATCH 05/10] Removed more java 8 usages, added missing license headers --- .../serialize/Base64SerializationUtils.java | 18 +++++++++++ .../apm/agent/universalprofiling/Clock.java | 18 +++++++++++ .../universalprofiling/MoveableEvent.java | 18 +++++++++++ .../universalprofiling/PeekingPoller.java | 30 ++++++++++++++++-- .../SpanProfilingSamplesCorrelator.java | 31 ++++++++++++++++--- .../UniversalProfilingIntegration.java | 2 +- .../Base64SerializationUtilTest.java | 18 +++++++++++ .../universalprofiling/PeekingPollerTest.java | 18 +++++++++++ 8 files changed, 145 insertions(+), 8 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtils.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtils.java index aea9fde5bc..20bcb088bc 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtils.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtils.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.report.serialize; import com.dslplatform.json.JsonWriter; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java index a4a0973af6..11ea268016 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/Clock.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.universalprofiling; public interface Clock { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java index c4e33f75b7..9fb79f7396 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/MoveableEvent.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.universalprofiling; public interface MoveableEvent> { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java index 95364cff42..772c24071a 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/PeekingPoller.java @@ -1,5 +1,24 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.universalprofiling; +import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventPoller; import java.util.function.Supplier; @@ -28,11 +47,16 @@ public interface Handler> { private boolean peekedEventPopulated; Handler currentHandler; - private final EventPoller.Handler subHandler = this::handleEvent; + private final EventPoller.Handler subHandler = new EventPoller.Handler() { + @Override + public boolean onEvent(Event event, long sequence, boolean endOfBatch) throws Exception { + return handleEvent(event, sequence, endOfBatch); + } + }; - public PeekingPoller(EventPoller wrappedPoller, Supplier emptyEventFactory) { + public PeekingPoller(EventPoller wrappedPoller, EventFactory emptyEventFactory) { this.poller = wrappedPoller; - peekedEvent = emptyEventFactory.get(); + peekedEvent = emptyEventFactory.newInstance(); peekedEventPopulated = false; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java index 3ea59bcde7..90d6b36981 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.universalprofiling; @@ -6,6 +24,7 @@ import co.elastic.apm.agent.report.Reporter; import co.elastic.apm.agent.sdk.logging.Logger; import co.elastic.apm.agent.sdk.logging.LoggerFactory; +import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventPoller; import com.lmax.disruptor.EventTranslatorTwoArg; import com.lmax.disruptor.RingBuffer; @@ -50,13 +69,17 @@ public SpanProfilingSamplesCorrelator( bufferCapacity = nextPowerOf2(bufferCapacity); // We use a wait strategy which doesn't involve signaling via condition variables // because we never block anyway (we use polling) - delayedSpans = - RingBuffer.createMultiProducer( - BufferedTransaction::new, bufferCapacity, new YieldingWaitStrategy()); + EventFactory eventFactory = new EventFactory() { + @Override + public BufferedTransaction newInstance() { + return new BufferedTransaction(); + } + }; + delayedSpans = RingBuffer.createMultiProducer(eventFactory, bufferCapacity, new YieldingWaitStrategy()); EventPoller nonPeekingPoller = delayedSpans.newPoller(); delayedSpans.addGatingSequences(nonPeekingPoller.getSequence()); - delayedSpansPoller = new PeekingPoller<>(nonPeekingPoller, BufferedTransaction::new); + delayedSpansPoller = new PeekingPoller<>(nonPeekingPoller, eventFactory); } public void setSpanBufferDurationNanos(long nanos) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index 6824d03498..40ab71a968 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -55,7 +55,7 @@ public class UniversalProfilingIntegration { */ static final long POLL_FREQUENCY_MS = 20; - private static final long INITIAL_SPAN_DELAY_NANOS = Duration.ofSeconds(1).toNanos(); + private static final long INITIAL_SPAN_DELAY_NANOS = 1_000_000_000L; private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtilTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtilTest.java index 257d6fba96..b16f295085 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtilTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/Base64SerializationUtilTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.report.serialize; import com.dslplatform.json.DslJson; diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java index 009d6e0877..75855a7368 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/universalprofiling/PeekingPollerTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package co.elastic.apm.agent.universalprofiling; import static org.assertj.core.api.Assertions.assertThat; From d186c9091049d69d1de6a39fb37f9f80a62d0c44 Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Fri, 3 May 2024 15:27:20 +0200 Subject: [PATCH 06/10] Fix test flake, fix native lib visibility issues --- apm-agent-core/pom.xml | 2 +- .../UniversalProfilingIntegrationTest.java | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/apm-agent-core/pom.xml b/apm-agent-core/pom.xml index 5f35f7912d..b542555929 100644 --- a/apm-agent-core/pom.xml +++ b/apm-agent-core/pom.xml @@ -103,7 +103,7 @@ co.elastic.otel jvmti-access - 0.3.0 + 0.3.1