From 730f1dce827a2523adc9b9bc2e5b7f292cd0d593 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 24 Jan 2019 17:20:54 +0100 Subject: [PATCH] Introduce IncreasingSleepingWaitStrategy to reduce CPU consumption fixes #443 --- .../apm/agent/report/ApmServerReporter.java | 4 +- .../IncreasingSleepingWaitStrategy.java | 93 +++++++++++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/IncreasingSleepingWaitStrategy.java diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java index 03f20361cb..a0c0eb22b1 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java @@ -23,13 +23,13 @@ import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.metrics.MetricRegistry; +import co.elastic.apm.agent.report.disruptor.IncreasingSleepingWaitStrategy; import co.elastic.apm.agent.util.ExecutorUtils; import co.elastic.apm.agent.util.MathUtils; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.IgnoreExceptionHandler; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.slf4j.Logger; @@ -104,7 +104,7 @@ public Thread newThread(Runnable r) { thread.setName("apm-reporter"); return thread; } - }, ProducerType.MULTI, PhasedBackoffWaitStrategy.withLiteLock(1, 10, TimeUnit.MILLISECONDS)); + }, ProducerType.MULTI, new IncreasingSleepingWaitStrategy(1_000_000, 250_000_000, 2_5000_000)); this.reportingEventHandler = reportingEventHandler; disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); disruptor.handleEventsWith(this.reportingEventHandler); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/IncreasingSleepingWaitStrategy.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/IncreasingSleepingWaitStrategy.java new file mode 100644 index 0000000000..9f34fa5feb --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/IncreasingSleepingWaitStrategy.java @@ -0,0 +1,93 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2019 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package co.elastic.apm.agent.report.disruptor; + +import com.lmax.disruptor.AlertException; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.WaitStrategy; + +import java.util.concurrent.locks.LockSupport; + +/** + * Sleeping strategy that sleeps (LockSupport.parkNanos(n)) for an linearly increasing + * number of nanos while the {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier. + *

+ * This strategy consumes very little CPU resources. + * However, latency spikes up to {@link #sleepTimeNsMax} will occur. + * It will also reduce the impact on the producing thread as it will not need signal any conditional variables to wake up the event handling + * thread. + *

+ *

+ * The agent does not require to transfer events with low latency from producers to the consumer. + * The primary concerns are low latency for adding events (to not block application threads) and low CPU consumption. + * It does not matter if handling the events is a bit delayed. + * The only caveat is that sudden traffic spikes after quiet periods can lead to loosing events. + * If that should become a problem, take a look at the approaches outlined in + * https://github.com/LMAX-Exchange/disruptor/issues/246 and https://github.com/census-instrumentation/opencensus-java/pull/1618. + * In situations where the consumer is overloaded (always busy and awake) this wait strategy actually has very good latency + * because consumers don't have to signal. + *

+ */ +public final class IncreasingSleepingWaitStrategy implements WaitStrategy { + private static final int DEFAULT_SLEEP = 100; + + private final int sleepTimeNsStart; + private final int sleepTimeNsMax; + private final int sleepTimeNsStep; + + public IncreasingSleepingWaitStrategy() { + this(DEFAULT_SLEEP, DEFAULT_SLEEP, 0); + } + + public IncreasingSleepingWaitStrategy(int sleepTimeNsStart, int sleepTimeNsMax, int sleepTimeNsStep) { + this.sleepTimeNsStart = sleepTimeNsStart; + this.sleepTimeNsMax = sleepTimeNsMax; + this.sleepTimeNsStep = sleepTimeNsStep; + } + + @Override + public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException { + long availableSequence; + int currentSleep = sleepTimeNsStart; + + while ((availableSequence = dependentSequence.get()) < sequence) { + currentSleep = applyWaitMethod(barrier, currentSleep); + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() { + } + + private int applyWaitMethod(final SequenceBarrier barrier, int currentSleep) throws AlertException { + barrier.checkAlert(); + + if (currentSleep < sleepTimeNsMax) { + LockSupport.parkNanos(currentSleep); + return currentSleep + sleepTimeNsStep; + } else { + LockSupport.parkNanos(sleepTimeNsMax); + return currentSleep; + } + } +}