diff --git a/ethereum/blockcreation/src/main/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockMiner.java b/ethereum/blockcreation/src/main/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockMiner.java index 7b46ea613a..c16861f9fc 100644 --- a/ethereum/blockcreation/src/main/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockMiner.java +++ b/ethereum/blockcreation/src/main/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockMiner.java @@ -22,7 +22,9 @@ import tech.pegasys.pantheon.util.Subscribers; import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Stopwatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -89,9 +91,10 @@ protected boolean mineBlock() throws InterruptedException { final long newBlockTimestamp = scheduler.waitUntilNextBlockCanBeMined(parentHeader); + final Stopwatch stopwatch = Stopwatch.createStarted(); LOG.trace("Mining a new block with timestamp {}", newBlockTimestamp); final Block block = blockCreator.createBlock(newBlockTimestamp); - LOG.info( + LOG.trace( "Block created, importing to local chain, block includes {} transactions", block.getBody().getTransactions().size()); @@ -101,10 +104,21 @@ protected boolean mineBlock() throws InterruptedException { importer.importBlock(protocolContext, block, HeaderValidationMode.FULL); if (blockImported) { notifyNewBlockListeners(block); - LOG.trace("Block {} imported to block chain.", block.getHeader().getNumber()); + final double taskTimeInSec = stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0; + LOG.info( + String.format( + "Produced and imported block #%,d / %d tx / %d om / %,d (%01.1f%%) gas / (%s) in %01.3fs", + block.getHeader().getNumber(), + block.getBody().getTransactions().size(), + block.getBody().getOmmers().size(), + block.getHeader().getGasUsed(), + (block.getHeader().getGasUsed() * 100.0) / block.getHeader().getGasLimit(), + block.getHash(), + taskTimeInSec)); } else { LOG.error("Illegal block mined, could not be imported to local chain."); } + return blockImported; } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java index 5b5f591d00..9bf0136ce6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTask.java @@ -19,6 +19,7 @@ import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import java.util.Collection; import java.util.concurrent.CancellationException; @@ -26,34 +27,44 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import com.google.common.base.Stopwatch; + public abstract class AbstractEthTask implements EthTask { private double taskTimeInSec = -1.0D; - private final LabelledMetric ethTasksTimer; private final OperationTimer taskTimer; protected final AtomicReference> result = new AtomicReference<>(); - private volatile Collection> subTaskFutures = new ConcurrentLinkedDeque<>(); + private final Collection> subTaskFutures = new ConcurrentLinkedDeque<>(); protected AbstractEthTask(final MetricsSystem metricsSystem) { - ethTasksTimer = + final LabelledMetric ethTasksTimer = metricsSystem.createLabelledTimer( MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"); - taskTimer = ethTasksTimer.labels(getClass().getSimpleName()); + if (ethTasksTimer == NoOpMetricsSystem.NO_OP_LABELLED_TIMER) { + taskTimer = + () -> + new OperationTimer.TimingContext() { + final Stopwatch stopwatch = Stopwatch.createStarted(); + + @Override + public double stopTimer() { + return stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0; + } + }; + } else { + taskTimer = ethTasksTimer.labels(getClass().getSimpleName()); + } } @Override public final CompletableFuture run() { if (result.compareAndSet(null, new CompletableFuture<>())) { executeTaskTimed(); - result - .get() - .whenComplete( - (r, t) -> { - cleanup(); - }); + result.get().whenComplete((r, t) -> cleanup()); } return result.get(); } @@ -62,12 +73,7 @@ public final CompletableFuture run() { public final CompletableFuture runAsync(final ExecutorService executor) { if (result.compareAndSet(null, new CompletableFuture<>())) { executor.submit(this::executeTaskTimed); - result - .get() - .whenComplete( - (r, t) -> { - cleanup(); - }); + result.get().whenComplete((r, t) -> cleanup()); } return result.get(); } @@ -109,10 +115,7 @@ protected final CompletableFuture executeSubTask( if (!isCancelled()) { final CompletableFuture subTaskFuture = subTask.get(); subTaskFutures.add(subTaskFuture); - subTaskFuture.whenComplete( - (r, t) -> { - subTaskFutures.remove(subTaskFuture); - }); + subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture)); return subTaskFuture; } else { return completedExceptionally(new CancellationException()); @@ -123,22 +126,16 @@ protected final CompletableFuture executeSubTask( /** * Utility for registring completable futures for cleanup if this EthTask is cancelled. * - * @param subTaskFuture the future to be reigstered. * @param the type of data returned from the CompletableFuture - * @return The completableFuture that was executed + * @param subTaskFuture the future to be registered. */ - protected final CompletableFuture registerSubTask( - final CompletableFuture subTaskFuture) { + protected final void registerSubTask(final CompletableFuture subTaskFuture) { synchronized (result) { if (!isCancelled()) { subTaskFutures.add(subTaskFuture); - subTaskFuture.whenComplete( - (r, t) -> { - subTaskFutures.remove(subTaskFuture); - }); - return subTaskFuture; + subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture)); } else { - return completedExceptionally(new CancellationException()); + completedExceptionally(new CancellationException()); } } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 2bb0e0752a..8307fc44d4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -148,7 +148,7 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai } } - void handleNewBlockFromNetwork(final EthMessage message) { + private void handleNewBlockFromNetwork(final EthMessage message) { final Blockchain blockchain = protocolContext.getBlockchain(); final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData()); try { @@ -330,11 +330,17 @@ private CompletableFuture runImportTask(final Block block) { block.getHeader().getNumber(), block.getHash()); } else { - final double timeInMs = importTask.getTaskTimeInSec() * 1000; + final double timeInS = importTask.getTaskTimeInSec(); LOG.info( String.format( - "Successfully imported announced block %d (%s) in %01.3fms.", - block.getHeader().getNumber(), block.getHash(), timeInMs)); + "Imported #%,d / %d tx / %d om / %,d (%01.1f%%) gas / (%s) in %01.3fs.", + block.getHeader().getNumber(), + block.getBody().getTransactions().size(), + block.getBody().getOmmers().size(), + block.getHeader().getGasUsed(), + (block.getHeader().getGasUsed() * 100.0) / block.getHeader().getGasLimit(), + block.getHash(), + timeInS)); } }); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTaskTest.java index b29847c15c..4b92764d7f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractEthTaskTest.java @@ -15,10 +15,14 @@ import static org.assertj.core.api.Assertions.assertThat; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.metrics.prometheus.MetricsConfiguration; +import tech.pegasys.pantheon.metrics.prometheus.PrometheusMetricsSystem; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import com.google.common.collect.Lists; import org.junit.Test; @@ -30,7 +34,7 @@ public void shouldCancelAllIncompleteSubtasksWhenMultipleIncomplete() { final CompletableFuture subtask1 = new CompletableFuture<>(); final CompletableFuture subtask2 = new CompletableFuture<>(); final EthTaskWithMultipleSubtasks task = - new EthTaskWithMultipleSubtasks(Lists.newArrayList(subtask1, subtask2)); + new EthTaskWithMultipleSubtasks(Arrays.asList(subtask1, subtask2)); task.run(); task.cancel(); @@ -45,7 +49,7 @@ public void shouldAnyCancelIncompleteSubtasksWhenMultiple() { final CompletableFuture subtask2 = new CompletableFuture<>(); final CompletableFuture subtask3 = new CompletableFuture<>(); final EthTaskWithMultipleSubtasks task = - new EthTaskWithMultipleSubtasks(Lists.newArrayList(subtask1, subtask2, subtask3)); + new EthTaskWithMultipleSubtasks(Arrays.asList(subtask1, subtask2, subtask3)); task.run(); subtask1.complete(null); @@ -63,7 +67,7 @@ public void shouldCompleteWhenCancelNotCalled() { final CompletableFuture subtask1 = new CompletableFuture<>(); final CompletableFuture subtask2 = new CompletableFuture<>(); final EthTaskWithMultipleSubtasks task = - new EthTaskWithMultipleSubtasks(Lists.newArrayList(subtask1, subtask2)); + new EthTaskWithMultipleSubtasks(Arrays.asList(subtask1, subtask2)); final CompletableFuture future = task.run(); subtask1.complete(null); @@ -78,6 +82,38 @@ public void shouldCompleteWhenCancelNotCalled() { assertThat(done).isTrue(); } + @Test + public void shouldTakeTimeToExecuteNoOpMetrics() { + final AbstractEthTask waitTask = + new AbstractEthTask(new NoOpMetricsSystem()) { + @Override + protected void executeTask() { + LockSupport.parkNanos(1_000_000); + } + }; + + waitTask.run(); + + assertThat(waitTask.getTaskTimeInSec()).isGreaterThan(0.0); + } + + @Test + public void shouldTakeTimeToExecutePrometheusMetrics() { + final MetricsConfiguration metricsConfiguration = MetricsConfiguration.createDefault(); + metricsConfiguration.setEnabled(true); + final AbstractEthTask waitTask = + new AbstractEthTask(PrometheusMetricsSystem.init(metricsConfiguration)) { + @Override + protected void executeTask() { + LockSupport.parkNanos(1_000_000); + } + }; + + waitTask.run(); + + assertThat(waitTask.getTaskTimeInSec()).isGreaterThan(0.0); + } + private class EthTaskWithMultipleSubtasks extends AbstractEthTask { private final List> subtasks;