From 24424d6f38531865c99ef013caa49f7108e3785a Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 28 Jan 2019 16:04:31 +0100 Subject: [PATCH] Introduce IncreasingSleepingWaitStrategy to reduce CPU consumption (#446) --- .../apm/agent/report/ApmServerReporter.java | 4 +- ...ionallyIncreasingSleepingWaitStrategy.java | 86 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/ExponentionallyIncreasingSleepingWaitStrategy.java diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java index 03f20361cb..60ee7787e2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java @@ -23,13 +23,13 @@ import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.metrics.MetricRegistry; +import co.elastic.apm.agent.report.disruptor.ExponentionallyIncreasingSleepingWaitStrategy; import co.elastic.apm.agent.util.ExecutorUtils; import co.elastic.apm.agent.util.MathUtils; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.IgnoreExceptionHandler; -import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.slf4j.Logger; @@ -104,7 +104,7 @@ public Thread newThread(Runnable r) { thread.setName("apm-reporter"); return thread; } - }, ProducerType.MULTI, PhasedBackoffWaitStrategy.withLiteLock(1, 10, TimeUnit.MILLISECONDS)); + }, ProducerType.MULTI, new ExponentionallyIncreasingSleepingWaitStrategy(100_000, 10_000_000)); this.reportingEventHandler = reportingEventHandler; disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); disruptor.handleEventsWith(this.reportingEventHandler); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/ExponentionallyIncreasingSleepingWaitStrategy.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/ExponentionallyIncreasingSleepingWaitStrategy.java new file mode 100644 index 0000000000..ef04dd07c2 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/disruptor/ExponentionallyIncreasingSleepingWaitStrategy.java @@ -0,0 +1,86 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 - 2019 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package co.elastic.apm.agent.report.disruptor; + +import com.lmax.disruptor.AlertException; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.WaitStrategy; + +import java.util.concurrent.locks.LockSupport; + +/** + * Sleeping strategy that sleeps (LockSupport.parkNanos(n)) for an linearly increasing + * number of nanos while the {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier. + *

+ * This strategy consumes very little CPU resources. + * However, latency spikes up to {@link #sleepTimeNsMax} will occur. + * It will also reduce the impact on the producing thread as it will not need signal any conditional variables to wake up the event handling + * thread. + *

+ *

+ * The agent does not require to transfer events with low latency from producers to the consumer. + * The primary concerns are low latency for adding events (to not block application threads) and low CPU consumption. + * It does not matter if handling the events is a bit delayed. + * The only caveat is that sudden traffic spikes after quiet periods can lead to loosing events. + * If that should become a problem, take a look at the approaches outlined in + * https://github.com/LMAX-Exchange/disruptor/issues/246 and https://github.com/census-instrumentation/opencensus-java/pull/1618. + * In situations where the consumer is overloaded (always busy and awake) this wait strategy actually has very good latency + * because consumers don't have to signal. + *

+ */ +public final class ExponentionallyIncreasingSleepingWaitStrategy implements WaitStrategy { + + private final int sleepTimeNsStart; + private final int sleepTimeNsMax; + + public ExponentionallyIncreasingSleepingWaitStrategy(int sleepTimeNsStart, int sleepTimeNsMax) { + this.sleepTimeNsStart = sleepTimeNsStart; + this.sleepTimeNsMax = sleepTimeNsMax; + } + + @Override + public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException { + long availableSequence; + int currentSleep = sleepTimeNsStart; + + while ((availableSequence = dependentSequence.get()) < sequence) { + currentSleep = applyWaitMethod(barrier, currentSleep); + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() { + } + + private int applyWaitMethod(final SequenceBarrier barrier, int currentSleep) throws AlertException { + barrier.checkAlert(); + + if (currentSleep < sleepTimeNsMax) { + LockSupport.parkNanos(currentSleep); + return currentSleep * 2; + } else { + LockSupport.parkNanos(sleepTimeNsMax); + return currentSleep; + } + } +}