diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java index 98ca6b0cb5..be318fe5a1 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/UniversalProfilingConfiguration.java @@ -34,9 +34,9 @@ public class UniversalProfilingConfiguration extends ConfigurationOptionProvider .description("If enabled, the apm agent will correlate it's transaction with the profiling data from elastic universal profiling running on the same host.") .buildWithDefault(false); - private final ConfigurationOption bufferSize = ConfigurationOption.longOption() + private final ConfigurationOption bufferSize = ConfigurationOption.integerOption() .key("universal_profiling_integration_buffer_size") - .addValidator(isInRange(64L, Long.MAX_VALUE)) + .addValidator(isInRange(64, Integer.MAX_VALUE)) .tags("added[1.50.0]", "internal") .configurationCategory(PROFILING_CATEGORY) .description("The feature needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received." + @@ -44,7 +44,7 @@ public class UniversalProfilingConfiguration extends ConfigurationOptionProvider "The higher the number of local root spans per second, the higher this buffer size should be set.\n" + "The agent will log a warning if it is not capable of buffering a span due to insufficient buffer size. " + "This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data.") - .buildWithDefault(4096L); + .buildWithDefault(4096); private final ConfigurationOption socketDir = ConfigurationOption.stringOption() .key("universal_profiling_integration_socket_dir") @@ -60,7 +60,7 @@ public boolean isEnabled() { return enabled.get(); } - public long getBufferSize() { + public int getBufferSize() { return bufferSize.get(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java index cff247b4ef..392516e024 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/SpanProfilingSamplesCorrelator.java @@ -72,6 +72,10 @@ public void onTransactionStart(Transaction transaction) { } } + public void dropTransaction(Transaction transaction) { + transactionsById.remove(transaction.getTraceContext().getId()); + } + public void reportOrBufferTransaction(Transaction transaction) { if (transactionsById.remove(transaction.getTraceContext().getId()) == null) { // transaction is not being correlated, e.g. because it was not sampled diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java index 33897cbfa6..65f2ee015a 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/universalprofiling/UniversalProfilingIntegration.java @@ -25,6 +25,7 @@ import co.elastic.apm.agent.impl.metadata.SystemInfo; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.ElasticContext; +import co.elastic.apm.agent.impl.transaction.Id; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.sdk.logging.Logger; import co.elastic.apm.agent.sdk.logging.LoggerFactory; @@ -42,11 +43,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; -import java.util.Base64; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; public class UniversalProfilingIntegration { @@ -56,6 +55,8 @@ public class UniversalProfilingIntegration { */ static final long POLL_FREQUENCY_MS = 20; + private static final long INITIAL_SPAN_DELAY_NANOS = Duration.ofSeconds(1).toNanos(); + private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class); private volatile ElasticApmTracer tracer; @@ -66,8 +67,12 @@ public class UniversalProfilingIntegration { // Visible for testing String socketPath = null; + @Nullable private ScheduledExecutorService executor; + @Nullable + private SpanProfilingSamplesCorrelator correlator; + private ActivationListener activationListener = new ActivationListener() { @Override @@ -102,6 +107,8 @@ public void start(ElasticApmTracer tracer) { coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath); UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage); + correlator = new SpanProfilingSamplesCorrelator(config.getBufferSize(), INITIAL_SPAN_DELAY_NANOS, tracer.getReporter()); + executor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("profiling-integration"); executor.scheduleWithFixedDelay(new Runnable() { @Override @@ -125,8 +132,10 @@ public void run() { } } - private void periodicTimer() { + // Visible for testing + void periodicTimer() { consumeProfilerMessages(); + correlator.flushPendingBufferedSpans(); } public void stop() { @@ -137,6 +146,8 @@ public void stop() { executor = null; } if (isActive) { + consumeProfilerMessages(); + correlator.shutdownAndFlushAll(); UniversalProfilingCorrelation.stopProfilerReturnChannel(); JvmtiAccess.destroy(); isActive = false; @@ -147,7 +158,7 @@ public void stop() { } public void afterTransactionStart(Transaction startedTransaction) { - //TODO: store the transaction in a map for correlating with profiling data + correlator.onTransactionStart(startedTransaction); } /** @@ -161,12 +172,11 @@ public void afterTransactionStart(Transaction startedTransaction) { * @param endedTransaction the transaction to be reported */ public void correlateAndReport(Transaction endedTransaction) { - //TODO: perform correlation and report after buffering for a certain delay - tracer.getReporter().report(endedTransaction); + correlator.reportOrBufferTransaction(endedTransaction); } public void drop(Transaction endedTransaction) { - //TODO: remove dropped transactions from correlation storage without reporting + correlator.dropTransaction(endedTransaction); } @@ -197,7 +207,7 @@ private String randomSocketFileName() { return name.toString(); } - private void consumeProfilerMessages() { + private synchronized void consumeProfilerMessages() { try { while (true) { try { @@ -223,10 +233,22 @@ private void consumeProfilerMessages() { } private void handleMessage(ProfilerRegistrationMessage message) { - //TODO: handle message + //TODO: update the host.id in the reporter metadata + log.debug("Received profiler registration message with host.id={} and expected latency of {} millis", + message.getHostId(), message.getSamplesDelayMillis()); + long delayMillis = message.getSamplesDelayMillis() + POLL_FREQUENCY_MS; + correlator.setSpanBufferDurationNanos(delayMillis * 1_000_000L); } + private final Id tempTraceId = Id.new128BitId(); + private final Id tempSpanId = Id.new64BitId(); + private final Id tempStackTraceId = Id.new128BitId(); private void handleMessage(TraceCorrelationMessage message) { - //TODO: handle message + tempTraceId.fromBytes(message.getTraceId(), 0); + tempSpanId.fromBytes(message.getLocalRootSpanId(), 0); + tempStackTraceId.fromBytes(message.getStackTraceId(), 0); + log.trace("Received profiler correlation message with trace.id={} transaction.id={} stacktrace.id={} count={}", + tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount()); + correlator.correlate(tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount()); } }