Skip to content

Commit

Permalink
Implemented correlation
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz committed May 2, 2024
1 parent 7b30a4e commit d1b257d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ public class UniversalProfilingConfiguration extends ConfigurationOptionProvider
.description("If enabled, the apm agent will correlate it's transaction with the profiling data from elastic universal profiling running on the same host.")
.buildWithDefault(false);

private final ConfigurationOption<Long> bufferSize = ConfigurationOption.longOption()
private final ConfigurationOption<Integer> bufferSize = ConfigurationOption.integerOption()
.key("universal_profiling_integration_buffer_size")
.addValidator(isInRange(64L, Long.MAX_VALUE))
.addValidator(isInRange(64, Integer.MAX_VALUE))
.tags("added[1.50.0]", "internal")
.configurationCategory(PROFILING_CATEGORY)
.description("The feature needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received." +
"This configuration option configures the buffer size in number of spans. " +
"The higher the number of local root spans per second, the higher this buffer size should be set.\n" +
"The agent will log a warning if it is not capable of buffering a span due to insufficient buffer size. " +
"This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data.")
.buildWithDefault(4096L);
.buildWithDefault(4096);

private final ConfigurationOption<String> socketDir = ConfigurationOption.stringOption()
.key("universal_profiling_integration_socket_dir")
Expand All @@ -60,7 +60,7 @@ public boolean isEnabled() {
return enabled.get();
}

public long getBufferSize() {
public int getBufferSize() {
return bufferSize.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public void onTransactionStart(Transaction transaction) {
}
}

public void dropTransaction(Transaction transaction) {
transactionsById.remove(transaction.getTraceContext().getId());
}

public void reportOrBufferTransaction(Transaction transaction) {
if (transactionsById.remove(transaction.getTraceContext().getId()) == null) {
// transaction is not being correlated, e.g. because it was not sampled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import co.elastic.apm.agent.impl.metadata.SystemInfo;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.ElasticContext;
import co.elastic.apm.agent.impl.transaction.Id;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
Expand All @@ -42,11 +43,9 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Base64;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

public class UniversalProfilingIntegration {

Expand All @@ -56,6 +55,8 @@ public class UniversalProfilingIntegration {
*/
static final long POLL_FREQUENCY_MS = 20;

private static final long INITIAL_SPAN_DELAY_NANOS = Duration.ofSeconds(1).toNanos();

private static final Logger log = LoggerFactory.getLogger(UniversalProfilingIntegration.class);

private volatile ElasticApmTracer tracer;
Expand All @@ -66,8 +67,12 @@ public class UniversalProfilingIntegration {
// Visible for testing
String socketPath = null;

@Nullable
private ScheduledExecutorService executor;

@Nullable
private SpanProfilingSamplesCorrelator correlator;

private ActivationListener activationListener = new ActivationListener() {

@Override
Expand Down Expand Up @@ -102,6 +107,8 @@ public void start(ElasticApmTracer tracer) {
coreConfig.getServiceName(), coreConfig.getEnvironment(), socketPath);
UniversalProfilingCorrelation.setProcessStorage(processCorrelationStorage);

correlator = new SpanProfilingSamplesCorrelator(config.getBufferSize(), INITIAL_SPAN_DELAY_NANOS, tracer.getReporter());

executor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("profiling-integration");
executor.scheduleWithFixedDelay(new Runnable() {
@Override
Expand All @@ -125,8 +132,10 @@ public void run() {
}
}

private void periodicTimer() {
// Visible for testing
void periodicTimer() {
consumeProfilerMessages();
correlator.flushPendingBufferedSpans();
}

public void stop() {
Expand All @@ -137,6 +146,8 @@ public void stop() {
executor = null;
}
if (isActive) {
consumeProfilerMessages();
correlator.shutdownAndFlushAll();
UniversalProfilingCorrelation.stopProfilerReturnChannel();
JvmtiAccess.destroy();
isActive = false;
Expand All @@ -147,7 +158,7 @@ public void stop() {
}

public void afterTransactionStart(Transaction startedTransaction) {
//TODO: store the transaction in a map for correlating with profiling data
correlator.onTransactionStart(startedTransaction);
}

/**
Expand All @@ -161,12 +172,11 @@ public void afterTransactionStart(Transaction startedTransaction) {
* @param endedTransaction the transaction to be reported
*/
public void correlateAndReport(Transaction endedTransaction) {
//TODO: perform correlation and report after buffering for a certain delay
tracer.getReporter().report(endedTransaction);
correlator.reportOrBufferTransaction(endedTransaction);
}

public void drop(Transaction endedTransaction) {
//TODO: remove dropped transactions from correlation storage without reporting
correlator.dropTransaction(endedTransaction);
}


Expand Down Expand Up @@ -197,7 +207,7 @@ private String randomSocketFileName() {
return name.toString();
}

private void consumeProfilerMessages() {
private synchronized void consumeProfilerMessages() {
try {
while (true) {
try {
Expand All @@ -223,10 +233,22 @@ private void consumeProfilerMessages() {
}

private void handleMessage(ProfilerRegistrationMessage message) {
//TODO: handle message
//TODO: update the host.id in the reporter metadata
log.debug("Received profiler registration message with host.id={} and expected latency of {} millis",
message.getHostId(), message.getSamplesDelayMillis());
long delayMillis = message.getSamplesDelayMillis() + POLL_FREQUENCY_MS;
correlator.setSpanBufferDurationNanos(delayMillis * 1_000_000L);
}

private final Id tempTraceId = Id.new128BitId();
private final Id tempSpanId = Id.new64BitId();
private final Id tempStackTraceId = Id.new128BitId();
private void handleMessage(TraceCorrelationMessage message) {
//TODO: handle message
tempTraceId.fromBytes(message.getTraceId(), 0);
tempSpanId.fromBytes(message.getLocalRootSpanId(), 0);
tempStackTraceId.fromBytes(message.getStackTraceId(), 0);
log.trace("Received profiler correlation message with trace.id={} transaction.id={} stacktrace.id={} count={}",
tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount());
correlator.correlate(tempTraceId, tempSpanId, tempStackTraceId, message.getSampleCount());
}
}

0 comments on commit d1b257d

Please sign in to comment.