Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Add metrics for EthScheduler executors (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Feb 18, 2019
1 parent 19ea520 commit 6c05c21
Show file tree
Hide file tree
Showing 17 changed files with 371 additions and 65 deletions.
1 change: 1 addition & 0 deletions consensus/ibftlegacy/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation project(':ethereum:jsonrpc')
implementation project(':ethereum:rlp')
implementation project(':ethereum:p2p')
implementation project(':metrics')
implementation project(':services:kvstore')

implementation 'com.google.guava:guava'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.List;

Expand All @@ -33,15 +34,17 @@ public Istanbul64ProtocolManager(
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers) {
final int computationWorkers,
final MetricsSystem metricsSystem) {
super(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
computationWorkers);
computationWorkers,
metricsSystem);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.Arrays;
Expand Down Expand Up @@ -96,14 +97,15 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final int syncWorkers,
final int txWorkers,
final int computationWorkers,
final int requestLimit) {
final int requestLimit,
final MetricsSystem metricsSystem) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers, computationWorkers));
new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem));
}

public EthProtocolManager(
Expand All @@ -113,7 +115,8 @@ public EthProtocolManager(
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int computationWorkers) {
final int computationWorkers,
final MetricsSystem metricsSystem) {
this(
blockchain,
worldStateArchive,
Expand All @@ -122,7 +125,8 @@ public EthProtocolManager(
syncWorkers,
txWorkers,
computationWorkers,
DEFAULT_REQUEST_LIMIT);
DEFAULT_REQUEST_LIMIT,
metricsSystem);
}

public EthContext ethContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newCachedThreadPool;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newFixedThreadPool;
import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool;
import static tech.pegasys.pantheon.util.FutureUtils.propagateResult;

import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
Expand All @@ -23,7 +27,6 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -33,7 +36,6 @@
import java.util.function.Function;
import java.util.function.Supplier;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -51,36 +53,24 @@ public class EthScheduler {
private final ExecutorService servicesExecutor;
private final ExecutorService computationExecutor;

private Collection<CompletableFuture<?>> serviceFutures = new ConcurrentLinkedDeque<>();
private final Collection<CompletableFuture<?>> serviceFutures = new ConcurrentLinkedDeque<>();

public EthScheduler(
final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) {
final int syncWorkerCount,
final int txWorkerCount,
final int computationWorkerCount,
final MetricsSystem metricsSystem) {
this(
Executors.newFixedThreadPool(
syncWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Workers-%d")
.build()),
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(EthScheduler.class.getSimpleName() + "Timer-%d")
.build()),
Executors.newFixedThreadPool(
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions-%d")
.build()),
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Services-%d")
.build()),
Executors.newFixedThreadPool(
newFixedThreadPool(
EthScheduler.class.getSimpleName() + "-Workers", syncWorkerCount, metricsSystem),
newScheduledThreadPool(EthScheduler.class.getSimpleName() + "-Timer", 1, metricsSystem),
newFixedThreadPool(
EthScheduler.class.getSimpleName() + "-Transactions", txWorkerCount, metricsSystem),
newCachedThreadPool(EthScheduler.class.getSimpleName() + "-Services", metricsSystem),
newFixedThreadPool(
EthScheduler.class.getSimpleName() + "-Computation",
computationWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Computation-%d")
.build()));
metricsSystem));
}

protected EthScheduler(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class MonitoredExecutors {

public static ExecutorService newFixedThreadPool(
final String name, final int workerCount, final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ThreadPoolExecutor(
workerCount,
workerCount,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
threadFactory,
rejectedExecutionHandler));
}

public static ExecutorService newCachedThreadPool(
final String name, final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory,
rejectedExecutionHandler));
}

public static ScheduledExecutorService newScheduledThreadPool(
final String name, final int corePoolSize, final MetricsSystem metricsSystem) {
return newMonitoredExecutor(
name,
metricsSystem,
(rejectedExecutionHandler, threadFactory) ->
new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, rejectedExecutionHandler));
}

private static <T extends ThreadPoolExecutor> T newMonitoredExecutor(
final String name,
final MetricsSystem metricsSystem,
final BiFunction<RejectedExecutionHandler, ThreadFactory, T> creator) {

final String metricName = name.toLowerCase(Locale.US).replace('-', '_');

final T executor =
creator.apply(
new CountingAbortPolicy(metricName, metricsSystem),
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build());

metricsSystem.createIntegerGauge(
MetricCategory.EXECUTORS,
metricName + "_queue_length_current",
"Current number of tasks awaiting execution",
executor.getQueue()::size);

metricsSystem.createIntegerGauge(
MetricCategory.EXECUTORS,
metricName + "_active_threads_current",
"Current number of threads executing tasks",
executor::getActiveCount);

metricsSystem.createIntegerGauge(
MetricCategory.EXECUTORS,
metricName + "_pool_size_current",
"Current number of threads in the thread pool",
executor::getPoolSize);

metricsSystem.createLongGauge(
MetricCategory.EXECUTORS,
metricName + "_completed_tasks_total",
"Total number of tasks executed",
executor::getCompletedTaskCount);

metricsSystem.createLongGauge(
MetricCategory.EXECUTORS,
metricName + "_submitted_tasks_total",
"Total number of tasks executed",
executor::getTaskCount);

return executor;
}

private static class CountingAbortPolicy extends AbortPolicy {

private final Counter rejectedTaskCounter;

public CountingAbortPolicy(final String metricName, final MetricsSystem metricsSystem) {
this.rejectedTaskCounter =
metricsSystem.createCounter(
MetricCategory.EXECUTORS,
metricName + "_rejected_tasks_total",
"Total number of tasks rejected by this executor");
}

@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) {
rejectedTaskCounter.inc();
super.rejectedExecution(r, e);
}
}
}
Loading

0 comments on commit 6c05c21

Please sign in to comment.