From 6c05c21cfcc9e52f1d73ce88f5d216a258c474ba Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 19 Feb 2019 06:27:35 +1000 Subject: [PATCH] Add metrics for EthScheduler executors (#878) --- consensus/ibftlegacy/build.gradle | 1 + .../protocol/Istanbul64ProtocolManager.java | 7 +- .../eth/manager/EthProtocolManager.java | 12 +- .../ethereum/eth/manager/EthScheduler.java | 46 ++--- .../eth/manager/MonitoredExecutors.java | 140 +++++++++++++ .../eth/manager/EthProtocolManagerTest.java | 187 ++++++++++++++++-- .../fastsync/FastSyncChainDownloaderTest.java | 2 +- .../sync/fullsync/FullSyncDownloaderTest.java | 2 +- .../fullsync/FullSyncTargetManagerTest.java | 2 +- .../ethereum/eth/transactions/TestNode.java | 3 +- .../pantheon/metrics/MetricCategory.java | 1 + .../pantheon/metrics/MetricsSystem.java | 16 ++ .../controller/CliquePantheonController.java | 3 +- .../IbftLegacyPantheonController.java | 6 +- .../controller/IbftPantheonController.java | 3 +- .../controller/MainnetPantheonController.java | 3 +- .../tech/pegasys/pantheon/RunnerTest.java | 2 +- 17 files changed, 371 insertions(+), 65 deletions(-) create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java diff --git a/consensus/ibftlegacy/build.gradle b/consensus/ibftlegacy/build.gradle index b3f572e6c6..e45c0bb85a 100644 --- a/consensus/ibftlegacy/build.gradle +++ b/consensus/ibftlegacy/build.gradle @@ -23,6 +23,7 @@ dependencies { implementation project(':ethereum:jsonrpc') implementation project(':ethereum:rlp') implementation project(':ethereum:p2p') + implementation project(':metrics') implementation project(':services:kvstore') implementation 'com.google.guava:guava' diff --git a/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java b/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java index 7d0c0b1294..b4aea06729 100644 --- a/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java +++ b/consensus/ibftlegacy/src/main/java/tech/pegasys/pantheon/consensus/ibftlegacy/protocol/Istanbul64ProtocolManager.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.MetricsSystem; import java.util.List; @@ -33,7 +34,8 @@ public Istanbul64ProtocolManager( final boolean fastSyncEnabled, final int syncWorkers, final int txWorkers, - final int computationWorkers) { + final int computationWorkers, + final MetricsSystem metricsSystem) { super( blockchain, worldStateArchive, @@ -41,7 +43,8 @@ public Istanbul64ProtocolManager( fastSyncEnabled, syncWorkers, txWorkers, - computationWorkers); + computationWorkers, + metricsSystem); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java index 53373518f6..86d9ebc1d6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManager.java @@ -31,6 +31,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.uint.UInt256; import java.util.Arrays; @@ -96,14 +97,15 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver { final int syncWorkers, final int txWorkers, final int computationWorkers, - final int requestLimit) { + final int requestLimit, + final MetricsSystem metricsSystem) { this( blockchain, worldStateArchive, networkId, fastSyncEnabled, requestLimit, - new EthScheduler(syncWorkers, txWorkers, computationWorkers)); + new EthScheduler(syncWorkers, txWorkers, computationWorkers, metricsSystem)); } public EthProtocolManager( @@ -113,7 +115,8 @@ public EthProtocolManager( final boolean fastSyncEnabled, final int syncWorkers, final int txWorkers, - final int computationWorkers) { + final int computationWorkers, + final MetricsSystem metricsSystem) { this( blockchain, worldStateArchive, @@ -122,7 +125,8 @@ public EthProtocolManager( syncWorkers, txWorkers, computationWorkers, - DEFAULT_REQUEST_LIMIT); + DEFAULT_REQUEST_LIMIT, + metricsSystem); } public EthContext ethContext() { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java index 4735c882c6..c87da2c794 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthScheduler.java @@ -12,8 +12,12 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager; +import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newCachedThreadPool; +import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newFixedThreadPool; +import static tech.pegasys.pantheon.ethereum.eth.manager.MonitoredExecutors.newScheduledThreadPool; import static tech.pegasys.pantheon.util.FutureUtils.propagateResult; +import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.ExceptionUtils; import java.time.Duration; @@ -23,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -33,7 +36,6 @@ import java.util.function.Function; import java.util.function.Supplier; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,36 +53,24 @@ public class EthScheduler { private final ExecutorService servicesExecutor; private final ExecutorService computationExecutor; - private Collection> serviceFutures = new ConcurrentLinkedDeque<>(); + private final Collection> serviceFutures = new ConcurrentLinkedDeque<>(); public EthScheduler( - final int syncWorkerCount, final int txWorkerCount, final int computationWorkerCount) { + final int syncWorkerCount, + final int txWorkerCount, + final int computationWorkerCount, + final MetricsSystem metricsSystem) { this( - Executors.newFixedThreadPool( - syncWorkerCount, - new ThreadFactoryBuilder() - .setNameFormat(EthScheduler.class.getSimpleName() + "-Workers-%d") - .build()), - Executors.newScheduledThreadPool( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(EthScheduler.class.getSimpleName() + "Timer-%d") - .build()), - Executors.newFixedThreadPool( - txWorkerCount, - new ThreadFactoryBuilder() - .setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions-%d") - .build()), - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat(EthScheduler.class.getSimpleName() + "-Services-%d") - .build()), - Executors.newFixedThreadPool( + newFixedThreadPool( + EthScheduler.class.getSimpleName() + "-Workers", syncWorkerCount, metricsSystem), + newScheduledThreadPool(EthScheduler.class.getSimpleName() + "-Timer", 1, metricsSystem), + newFixedThreadPool( + EthScheduler.class.getSimpleName() + "-Transactions", txWorkerCount, metricsSystem), + newCachedThreadPool(EthScheduler.class.getSimpleName() + "-Services", metricsSystem), + newFixedThreadPool( + EthScheduler.class.getSimpleName() + "-Computation", computationWorkerCount, - new ThreadFactoryBuilder() - .setNameFormat(EthScheduler.class.getSimpleName() + "-Computation-%d") - .build())); + metricsSystem)); } protected EthScheduler( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java new file mode 100644 index 0000000000..b13deea9b3 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/MonitoredExecutors.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.manager; + +import tech.pegasys.pantheon.metrics.Counter; +import tech.pegasys.pantheon.metrics.MetricCategory; +import tech.pegasys.pantheon.metrics.MetricsSystem; + +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class MonitoredExecutors { + + public static ExecutorService newFixedThreadPool( + final String name, final int workerCount, final MetricsSystem metricsSystem) { + return newMonitoredExecutor( + name, + metricsSystem, + (rejectedExecutionHandler, threadFactory) -> + new ThreadPoolExecutor( + workerCount, + workerCount, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + threadFactory, + rejectedExecutionHandler)); + } + + public static ExecutorService newCachedThreadPool( + final String name, final MetricsSystem metricsSystem) { + return newMonitoredExecutor( + name, + metricsSystem, + (rejectedExecutionHandler, threadFactory) -> + new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory, + rejectedExecutionHandler)); + } + + public static ScheduledExecutorService newScheduledThreadPool( + final String name, final int corePoolSize, final MetricsSystem metricsSystem) { + return newMonitoredExecutor( + name, + metricsSystem, + (rejectedExecutionHandler, threadFactory) -> + new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, rejectedExecutionHandler)); + } + + private static T newMonitoredExecutor( + final String name, + final MetricsSystem metricsSystem, + final BiFunction creator) { + + final String metricName = name.toLowerCase(Locale.US).replace('-', '_'); + + final T executor = + creator.apply( + new CountingAbortPolicy(metricName, metricsSystem), + new ThreadFactoryBuilder().setNameFormat(name + "-%d").build()); + + metricsSystem.createIntegerGauge( + MetricCategory.EXECUTORS, + metricName + "_queue_length_current", + "Current number of tasks awaiting execution", + executor.getQueue()::size); + + metricsSystem.createIntegerGauge( + MetricCategory.EXECUTORS, + metricName + "_active_threads_current", + "Current number of threads executing tasks", + executor::getActiveCount); + + metricsSystem.createIntegerGauge( + MetricCategory.EXECUTORS, + metricName + "_pool_size_current", + "Current number of threads in the thread pool", + executor::getPoolSize); + + metricsSystem.createLongGauge( + MetricCategory.EXECUTORS, + metricName + "_completed_tasks_total", + "Total number of tasks executed", + executor::getCompletedTaskCount); + + metricsSystem.createLongGauge( + MetricCategory.EXECUTORS, + metricName + "_submitted_tasks_total", + "Total number of tasks executed", + executor::getTaskCount); + + return executor; + } + + private static class CountingAbortPolicy extends AbortPolicy { + + private final Counter rejectedTaskCounter; + + public CountingAbortPolicy(final String metricName, final MetricsSystem metricsSystem) { + this.rejectedTaskCounter = + metricsSystem.createCounter( + MetricCategory.EXECUTORS, + metricName + "_rejected_tasks_total", + "Total number of tasks rejected by this executor"); + } + + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) { + rejectedTaskCounter.inc(); + super.rejectedExecution(r, e); + } + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java index e89d79ffa7..25000a994d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java @@ -57,6 +57,7 @@ import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; @@ -104,7 +105,14 @@ public static void setup() { public void disconnectOnUnsolicitedMessage() { try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); @@ -117,7 +125,14 @@ public void disconnectOnUnsolicitedMessage() { public void disconnectOnFailureToSendStatusMessage() { try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = @@ -131,7 +146,14 @@ public void disconnectOnFailureToSendStatusMessage() { public void disconnectOnWrongChainId() { try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = @@ -156,7 +178,14 @@ public void disconnectOnWrongChainId() { public void disconnectOnWrongGenesisHash() { try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final MessageData messageData = BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get())); final MockPeerConnection peer = @@ -181,7 +210,14 @@ public void disconnectOnWrongGenesisHash() { public void doNotDisconnectOnValidMessage() { try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final MessageData messageData = GetBlockBodiesMessage.create(Collections.singletonList(gen.hash())); final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {}); @@ -198,7 +234,14 @@ public void respondToGetHeaders() throws ExecutionException, InterruptedExceptio final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long startBlock = 5L; final int blockCount = 5; final MessageData messageData = @@ -231,7 +274,15 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru final int limit = 5; try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1, limit)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + limit, + new NoOpMetricsSystem())) { final long startBlock = 5L; final int blockCount = 10; final MessageData messageData = @@ -263,7 +314,14 @@ public void respondToGetHeadersReversed() throws ExecutionException, Interrupted final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long endBlock = 10L; final int blockCount = 5; final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true); @@ -294,7 +352,14 @@ public void respondToGetHeadersWithSkip() throws ExecutionException, Interrupted final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long startBlock = 5L; final int blockCount = 5; final int skip = 1; @@ -328,7 +393,14 @@ public void respondToGetHeadersReversedWithSkip() final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long endBlock = 10L; final int blockCount = 5; final int skip = 1; @@ -383,7 +455,14 @@ public void respondToGetHeadersPartial() throws ExecutionException, InterruptedE final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long startBlock = blockchain.getChainHeadBlockNumber() - 1L; final int blockCount = 5; final MessageData messageData = @@ -415,7 +494,14 @@ public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedExc final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long startBlock = blockchain.getChainHeadBlockNumber() + 1; final int blockCount = 5; final MessageData messageData = @@ -444,7 +530,14 @@ public void respondToGetBodies() throws ExecutionException, InterruptedException final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { // Setup blocks query final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final int blockCount = 2; @@ -489,7 +582,15 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup final int limit = 5; try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1, limit)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + limit, + new NoOpMetricsSystem())) { // Setup blocks query final int blockCount = 10; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; @@ -533,7 +634,14 @@ public void respondToGetBodiesPartial() throws ExecutionException, InterruptedEx final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { // Setup blocks query final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1; final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get(); @@ -571,7 +679,14 @@ public void respondToGetReceipts() throws ExecutionException, InterruptedExcepti final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { // Setup blocks query final long startBlock = blockchain.getChainHeadBlockNumber() - 5; final int blockCount = 2; @@ -615,7 +730,15 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr final int limit = 5; try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1, limit)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + limit, + new NoOpMetricsSystem())) { // Setup blocks query final int blockCount = 10; final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount; @@ -658,7 +781,14 @@ public void respondToGetReceiptsPartial() throws ExecutionException, Interrupted final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { // Setup blocks query final long blockNumber = blockchain.getChainHeadBlockNumber() - 5; final BlockHeader header = blockchain.getBlockHeader(blockNumber).get(); @@ -697,7 +827,8 @@ public void respondToGetNodeData() throws Exception { final WorldStateArchive worldStateArchive = protocolContext.getWorldStateArchive(); try (final EthProtocolManager ethManager = - new EthProtocolManager(blockchain, worldStateArchive, 1, true, 1, 1, 1)) { + new EthProtocolManager( + blockchain, worldStateArchive, 1, true, 1, 1, 1, new NoOpMetricsSystem())) { // Setup node data query final List expectedResults = new ArrayList<>(); @@ -740,7 +871,14 @@ public void respondToGetNodeData() throws Exception { public void newBlockMinedSendsNewBlockMessageToAllPeers() { final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1); + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem()); // Define handler to validate response final PeerSendHandler onSend = mock(PeerSendHandler.class); @@ -804,7 +942,14 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero() final CompletableFuture done = new CompletableFuture<>(); try (final EthProtocolManager ethManager = new EthProtocolManager( - blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, 1)) { + blockchain, + protocolContext.getWorldStateArchive(), + 1, + true, + 1, + 1, + 1, + new NoOpMetricsSystem())) { final long startBlock = 1L; final int requestedBlockCount = 13; final int receivedBlockCount = 2; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java index 1168ca7d6e..4df7085d06 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncChainDownloaderTest.java @@ -69,7 +69,7 @@ public void setup() { localBlockchain, localBlockchainSetup.getWorldArchive(), DeterministicEthScheduler.TimeoutPolicy.NEVER, - new EthScheduler(1, 1, 1)); + new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index a7392cf851..175407cc9e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -88,7 +88,7 @@ public void setupTest() { localBlockchain, localBlockchainSetup.getWorldArchive(), DeterministicEthScheduler.TimeoutPolicy.NEVER, - new EthScheduler(1, 1, 1)); + new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); ethContext = ethProtocolManager.ethContext(); syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java index 880ae6949a..55b1d4539e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncTargetManagerTest.java @@ -66,7 +66,7 @@ public void setup() { localBlockchain, localWorldState, DeterministicEthScheduler.TimeoutPolicy.NEVER, - new EthScheduler(1, 1, 1)); + new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); final EthContext ethContext = ethProtocolManager.ethContext(); final SyncState syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java index 00cab49578..8cf7433a3e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java @@ -104,7 +104,8 @@ public TestNode( final ProtocolContext protocolContext = new ProtocolContext<>(blockchain, worldStateArchive, null); final EthProtocolManager ethProtocolManager = - new EthProtocolManager(blockchain, worldStateArchive, 1, false, 1, 1, 1); + new EthProtocolManager( + blockchain, worldStateArchive, 1, false, 1, 1, 1, new NoOpMetricsSystem()); final NetworkRunner networkRunner = NetworkRunner.builder() diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java index 110f25a88d..41a32b3889 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java @@ -15,6 +15,7 @@ public enum MetricCategory { BIG_QUEUE("big_queue"), BLOCKCHAIN("blockchain"), + EXECUTORS("executors"), JVM("jvm", false), NETWORK("network"), PEERS("peers"), diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricsSystem.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricsSystem.java index 2e95b93416..329c639d67 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricsSystem.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricsSystem.java @@ -36,6 +36,22 @@ LabelledMetric createLabelledTimer( void createGauge( MetricCategory category, String name, String help, Supplier valueSupplier); + default void createIntegerGauge( + final MetricCategory category, + final String name, + final String help, + final Supplier valueSupplier) { + createGauge(category, name, help, () -> (double) valueSupplier.get()); + } + + default void createLongGauge( + final MetricCategory category, + final String name, + final String help, + final Supplier valueSupplier) { + createGauge(category, name, help, () -> (double) valueSupplier.get()); + } + Stream getMetrics(MetricCategory category); default Stream getMetrics() { diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index 2b74d50075..111d1cf852 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -143,7 +143,8 @@ public static PantheonController init( fastSyncEnabled, syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), - syncConfig.computationParallelism()); + syncConfig.computationParallelism(), + metricsSystem); final SyncState syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers()); final Synchronizer synchronizer = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java index 2c9703c0f7..7c92cd82b0 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java @@ -135,7 +135,8 @@ public static PantheonController init( fastSyncEnabled, syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), - syncConfig.computationParallelism()); + syncConfig.computationParallelism(), + metricsSystem); } else { ethSubProtocol = EthProtocol.get(); ethProtocolManager = @@ -146,7 +147,8 @@ public static PantheonController init( fastSyncEnabled, syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), - syncConfig.computationParallelism()); + syncConfig.computationParallelism(), + metricsSystem); } final SyncState syncState = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 533e215a96..9aac539eb5 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -161,7 +161,8 @@ public static PantheonController init( fastSyncEnabled, syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), - syncConfig.computationParallelism()); + syncConfig.computationParallelism(), + metricsSystem); final SubProtocol ethSubProtocol = EthProtocol.get(); final SyncState syncState = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index ee4edd0ec7..add89d5ad2 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -115,7 +115,8 @@ public static PantheonController init( fastSyncEnabled, syncConfig.downloaderParallelism(), syncConfig.transactionsParallelism(), - syncConfig.computationParallelism()); + syncConfig.computationParallelism(), + metricsSystem); final SyncState syncState = new SyncState(blockchain, ethProtocolManager.ethContext().getEthPeers()); final Synchronizer synchronizer = diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java index aff095fe12..7f03c4926c 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java @@ -156,7 +156,7 @@ private void syncFromGenesis(final SyncMode mode) throws Exception { SynchronizerConfiguration.builder() .syncMode(mode) .fastSyncPivotDistance(5) - .fastSyncMaximumPeerWaitTime(Duration.ofSeconds(5)) + .fastSyncMaximumPeerWaitTime(Duration.ofSeconds(1)) .build(); final Path dataDirBehind = temp.newFolder().toPath(); final JsonRpcConfiguration behindJsonRpcConfiguration = jsonRpcConfiguration();