Skip to content

Commit

Permalink
Improve imported block log line
Browse files Browse the repository at this point in the history
Improve the import log line to include relevant metrics like number
of transacitons, ommers, and gas usage.
Also, this fixes a bug in that we get task times when metrics are not
turned on.

Sample line:
2019-03-11 21:03:50.245+00:00 | EthScheduler-Workers-3 | INFO  | BlockPropagationManager | Imported PegaSysEng#36 / 20 tx / 0 om / 420,000 (4.0%) gas / (0xecb366469d88d9d75b20721581797c1eb1118557b93db9a460dc0e000b9ec7fe) in 0.143s.
  • Loading branch information
shemnon committed Mar 12, 2019
1 parent cd547a8 commit 4fd0670
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import tech.pegasys.pantheon.util.Subscribers;

import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Stopwatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -89,9 +91,10 @@ protected boolean mineBlock() throws InterruptedException {

final long newBlockTimestamp = scheduler.waitUntilNextBlockCanBeMined(parentHeader);

final Stopwatch stopwatch = Stopwatch.createStarted();
LOG.trace("Mining a new block with timestamp {}", newBlockTimestamp);
final Block block = blockCreator.createBlock(newBlockTimestamp);
LOG.info(
LOG.trace(
"Block created, importing to local chain, block includes {} transactions",
block.getBody().getTransactions().size());

Expand All @@ -101,10 +104,21 @@ protected boolean mineBlock() throws InterruptedException {
importer.importBlock(protocolContext, block, HeaderValidationMode.FULL);
if (blockImported) {
notifyNewBlockListeners(block);
LOG.trace("Block {} imported to block chain.", block.getHeader().getNumber());
final double taskTimeInSec = stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0;
LOG.info(
String.format(
"Produced and imported block #%,d / %d tx / %d om / %,d (%01.1f%%) gas / (%s) in %01.3fs",
block.getHeader().getNumber(),
block.getBody().getTransactions().size(),
block.getBody().getOmmers().size(),
block.getHeader().getGasUsed(),
(block.getHeader().getGasUsed() * 100.0) / block.getHeader().getGasLimit(),
block.getHash(),
taskTimeInSec));
} else {
LOG.error("Illegal block mined, could not be imported to local chain.");
}

return blockImported;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,52 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import com.google.common.base.Stopwatch;

public abstract class AbstractEthTask<T> implements EthTask<T> {

private double taskTimeInSec = -1.0D;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private final OperationTimer taskTimer;
protected final AtomicReference<CompletableFuture<T>> result = new AtomicReference<>();
private volatile Collection<CompletableFuture<?>> subTaskFutures = new ConcurrentLinkedDeque<>();
private final Collection<CompletableFuture<?>> subTaskFutures = new ConcurrentLinkedDeque<>();

protected AbstractEthTask(final MetricsSystem metricsSystem) {
ethTasksTimer =
final LabelledMetric<OperationTimer> ethTasksTimer =
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName");
taskTimer = ethTasksTimer.labels(getClass().getSimpleName());
if (ethTasksTimer == NoOpMetricsSystem.NO_OP_LABELLED_TIMER) {
taskTimer =
() ->
new OperationTimer.TimingContext() {
final Stopwatch stopwatch = Stopwatch.createStarted();

@Override
public double stopTimer() {
return stopwatch.elapsed(TimeUnit.MILLISECONDS) / 1000.0;
}
};
} else {
taskTimer = ethTasksTimer.labels(getClass().getSimpleName());
}
}

@Override
public final CompletableFuture<T> run() {
if (result.compareAndSet(null, new CompletableFuture<>())) {
executeTaskTimed();
result
.get()
.whenComplete(
(r, t) -> {
cleanup();
});
result.get().whenComplete((r, t) -> cleanup());
}
return result.get();
}
Expand All @@ -62,12 +73,7 @@ public final CompletableFuture<T> run() {
public final CompletableFuture<T> runAsync(final ExecutorService executor) {
if (result.compareAndSet(null, new CompletableFuture<>())) {
executor.submit(this::executeTaskTimed);
result
.get()
.whenComplete(
(r, t) -> {
cleanup();
});
result.get().whenComplete((r, t) -> cleanup());
}
return result.get();
}
Expand Down Expand Up @@ -109,10 +115,7 @@ protected final <S> CompletableFuture<S> executeSubTask(
if (!isCancelled()) {
final CompletableFuture<S> subTaskFuture = subTask.get();
subTaskFutures.add(subTaskFuture);
subTaskFuture.whenComplete(
(r, t) -> {
subTaskFutures.remove(subTaskFuture);
});
subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture));
return subTaskFuture;
} else {
return completedExceptionally(new CancellationException());
Expand All @@ -123,22 +126,16 @@ protected final <S> CompletableFuture<S> executeSubTask(
/**
* Utility for registring completable futures for cleanup if this EthTask is cancelled.
*
* @param subTaskFuture the future to be reigstered.
* @param <S> the type of data returned from the CompletableFuture
* @return The completableFuture that was executed
* @param subTaskFuture the future to be registered.
*/
protected final <S> CompletableFuture<S> registerSubTask(
final CompletableFuture<S> subTaskFuture) {
protected final <S> void registerSubTask(final CompletableFuture<S> subTaskFuture) {
synchronized (result) {
if (!isCancelled()) {
subTaskFutures.add(subTaskFuture);
subTaskFuture.whenComplete(
(r, t) -> {
subTaskFutures.remove(subTaskFuture);
});
return subTaskFuture;
subTaskFuture.whenComplete((r, t) -> subTaskFutures.remove(subTaskFuture));
} else {
return completedExceptionally(new CancellationException());
completedExceptionally(new CancellationException());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai
}
}

void handleNewBlockFromNetwork(final EthMessage message) {
private void handleNewBlockFromNetwork(final EthMessage message) {
final Blockchain blockchain = protocolContext.getBlockchain();
final NewBlockMessage newBlockMessage = NewBlockMessage.readFrom(message.getData());
try {
Expand Down Expand Up @@ -330,11 +330,17 @@ private CompletableFuture<Block> runImportTask(final Block block) {
block.getHeader().getNumber(),
block.getHash());
} else {
final double timeInMs = importTask.getTaskTimeInSec() * 1000;
final double timeInS = importTask.getTaskTimeInSec();
LOG.info(
String.format(
"Successfully imported announced block %d (%s) in %01.3fms.",
block.getHeader().getNumber(), block.getHash(), timeInMs));
"Imported #%,d / %d tx / %d om / %,d (%01.1f%%) gas / (%s) in %01.3fs.",
block.getHeader().getNumber(),
block.getBody().getTransactions().size(),
block.getBody().getOmmers().size(),
block.getHeader().getGasUsed(),
(block.getHeader().getGasUsed() * 100.0) / block.getHeader().getGasLimit(),
block.getHash(),
timeInS));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.metrics.prometheus.MetricsConfiguration;
import tech.pegasys.pantheon.metrics.prometheus.PrometheusMetricsSystem;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

import com.google.common.collect.Lists;
import org.junit.Test;
Expand All @@ -30,7 +34,7 @@ public void shouldCancelAllIncompleteSubtasksWhenMultipleIncomplete() {
final CompletableFuture<Void> subtask1 = new CompletableFuture<>();
final CompletableFuture<Void> subtask2 = new CompletableFuture<>();
final EthTaskWithMultipleSubtasks task =
new EthTaskWithMultipleSubtasks(Lists.newArrayList(subtask1, subtask2));
new EthTaskWithMultipleSubtasks(Arrays.asList(subtask1, subtask2));

task.run();
task.cancel();
Expand All @@ -45,7 +49,7 @@ public void shouldAnyCancelIncompleteSubtasksWhenMultiple() {
final CompletableFuture<Void> subtask2 = new CompletableFuture<>();
final CompletableFuture<Void> subtask3 = new CompletableFuture<>();
final EthTaskWithMultipleSubtasks task =
new EthTaskWithMultipleSubtasks(Lists.newArrayList(subtask1, subtask2, subtask3));
new EthTaskWithMultipleSubtasks(Arrays.asList(subtask1, subtask2, subtask3));

task.run();
subtask1.complete(null);
Expand All @@ -63,7 +67,7 @@ public void shouldCompleteWhenCancelNotCalled() {
final CompletableFuture<Void> subtask1 = new CompletableFuture<>();
final CompletableFuture<Void> subtask2 = new CompletableFuture<>();
final EthTaskWithMultipleSubtasks task =
new EthTaskWithMultipleSubtasks(Lists.newArrayList(subtask1, subtask2));
new EthTaskWithMultipleSubtasks(Arrays.asList(subtask1, subtask2));

final CompletableFuture<Void> future = task.run();
subtask1.complete(null);
Expand All @@ -78,6 +82,38 @@ public void shouldCompleteWhenCancelNotCalled() {
assertThat(done).isTrue();
}

@Test
public void shouldTakeTimeToExecuteNoOpMetrics() {
final AbstractEthTask<Void> waitTask =
new AbstractEthTask<Void>(new NoOpMetricsSystem()) {
@Override
protected void executeTask() {
LockSupport.parkNanos(1_000_000);
}
};

waitTask.run();

assertThat(waitTask.getTaskTimeInSec()).isGreaterThan(0.0);
}

@Test
public void shouldTakeTimeToExecutePrometheusMetrics() {
final MetricsConfiguration metricsConfiguration = MetricsConfiguration.createDefault();
metricsConfiguration.setEnabled(true);
final AbstractEthTask<Void> waitTask =
new AbstractEthTask<Void>(PrometheusMetricsSystem.init(metricsConfiguration)) {
@Override
protected void executeTask() {
LockSupport.parkNanos(1_000_000);
}
};

waitTask.run();

assertThat(waitTask.getTaskTimeInSec()).isGreaterThan(0.0);
}

private class EthTaskWithMultipleSubtasks extends AbstractEthTask<Void> {

private final List<CompletableFuture<?>> subtasks;
Expand Down

0 comments on commit 4fd0670

Please sign in to comment.