Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
NC-1880 High TX volume swamps block processing (#337)
Browse files Browse the repository at this point in the history
* NC-1880 High TX volume swamps block processing

Move transaction processing into its own thread(s).

Size of txWorkerExecutor thread pool can be independently configured.
  • Loading branch information
shemnon authored Dec 3, 2018
1 parent b74f88a commit abaaef7
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public Istanbul64ProtocolManager(
final Blockchain blockchain,
final int networkId,
final boolean fastSyncEnabled,
final int workers) {
super(blockchain, networkId, fastSyncEnabled, workers);
final int syncWorkers,
final int txWorkers) {
super(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected final <S> CompletableFuture<S> executeSubTask(
*/
protected final <S> CompletableFuture<S> executeWorkerSubTask(
final EthScheduler scheduler, final Supplier<CompletableFuture<S>> subTask) {
return executeSubTask(() -> scheduler.scheduleWorkerTask(subTask));
return executeSubTask(() -> scheduler.scheduleSyncWorkerTask(subTask));
}

public final T result() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,24 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
final Blockchain blockchain,
final int networkId,
final boolean fastSyncEnabled,
final int workers,
final int syncWorkers,
final int txWorkers,
final int requestLimit) {
this(blockchain, networkId, fastSyncEnabled, requestLimit, new EthScheduler(workers));
this(
blockchain,
networkId,
fastSyncEnabled,
requestLimit,
new EthScheduler(syncWorkers, txWorkers));
}

public EthProtocolManager(
final Blockchain blockchain,
final int networkId,
final boolean fastSyncEnabled,
final int workers) {
this(blockchain, networkId, fastSyncEnabled, workers, DEFAULT_REQUEST_LIMIT);
final int syncWorkers,
final int txWorkers) {
this(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers, DEFAULT_REQUEST_LIMIT);
}

public EthContext ethContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ public class EthScheduler {
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1);

protected final ExecutorService workerExecutor;
protected final ExecutorService syncWorkerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService txWorkerExecutor;

EthScheduler(final int workerCount) {
EthScheduler(final int syncWorkerCount, final int txWorkerCount) {
this(
Executors.newFixedThreadPool(
workerCount,
syncWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Workers")
.build()),
Expand All @@ -56,19 +57,28 @@ public class EthScheduler {
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(EthScheduler.class.getSimpleName() + "Timer")
.build()),
Executors.newFixedThreadPool(
txWorkerCount,
new ThreadFactoryBuilder()
.setNameFormat(EthScheduler.class.getSimpleName() + "-Transactions")
.build()));
}

protected EthScheduler(
final ExecutorService workerExecutor, final ScheduledExecutorService scheduler) {
this.workerExecutor = workerExecutor;
final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor;
}

public <T> CompletableFuture<T> scheduleWorkerTask(final Supplier<CompletableFuture<T>> future) {
public <T> CompletableFuture<T> scheduleSyncWorkerTask(
final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture =
workerExecutor.submit(
syncWorkerExecutor.submit(
() -> {
future
.get()
Expand All @@ -91,8 +101,12 @@ public <T> CompletableFuture<T> scheduleWorkerTask(final Supplier<CompletableFut
return promise;
}

public Future<?> scheduleWorkerTask(final Runnable command) {
return workerExecutor.submit(command);
public Future<?> scheduleSyncWorkerTask(final Runnable command) {
return syncWorkerExecutor.submit(command);
}

public Future<?> scheduleTxWorkerTask(final Runnable command) {
return txWorkerExecutor.submit(command);
}

public CompletableFuture<Void> scheduleFutureTask(
Expand Down Expand Up @@ -179,7 +193,7 @@ private <T> CompletableFuture<T> timeout(
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.trace("Stopping " + getClass().getSimpleName());
workerExecutor.shutdown();
syncWorkerExecutor.shutdown();
scheduler.shutdown();
shutdown.countDown();
} else {
Expand All @@ -189,10 +203,10 @@ public void stop() {

public void awaitStop() throws InterruptedException {
shutdown.await();
if (!workerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
if (!syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} worker executor did not shutdown cleanly.", this.getClass().getSimpleName());
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
syncWorkerExecutor.shutdownNow();
syncWorkerExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
if (!scheduler.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("{} scheduler did not shutdown cleanly.", this.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchai
protocolSchedule, protocolContext, readyForImport, HeaderValidationMode.FULL);
ethContext
.getScheduler()
.scheduleWorkerTask(importBlocksTask)
.scheduleSyncWorkerTask(importBlocksTask)
.whenComplete(
(r, t) -> {
if (r != null) {
Expand Down Expand Up @@ -255,7 +255,7 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block) {
final OperationTimer.TimingContext blockTimer = announcedBlockIngestTimer.startTimer();
return ethContext
.getScheduler()
.scheduleWorkerTask(importTask::run)
.scheduleSyncWorkerTask(importTask::run)
.whenComplete(
(r, t) -> {
if (t != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class SynchronizerConfiguration {
private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers;
private final int downloaderParallelism;
private final int transactionsParallelism;

private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
Expand All @@ -67,7 +68,8 @@ private SynchronizerConfiguration(
final int downloaderChainSegmentSize,
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers,
final int downloaderParallelism) {
final int downloaderParallelism,
final int transactionsParallelism) {
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
Expand All @@ -82,6 +84,7 @@ private SynchronizerConfiguration(
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
}

/**
Expand Down Expand Up @@ -122,7 +125,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism);
downloaderParallelism,
transactionsParallelism);
}

public static Builder builder() {
Expand Down Expand Up @@ -203,6 +207,10 @@ public int downloaderParallelism() {
return downloaderParallelism;
}

public int transactionsParallelism() {
return transactionsParallelism;
}

/**
* The rate at which blocks should be fully validated during fast sync. At a rate of 1f, all
* blocks are fully validated. At rates less than 1f, a subset of blocks will undergo light-weight
Expand All @@ -228,6 +236,7 @@ public static class Builder {
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int transactionsParallelism = 2;

public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
Expand Down Expand Up @@ -299,6 +308,11 @@ public Builder downloaderParallelisim(final int downloaderParallelism) {
return this;
}

public Builder transactionsParallelism(final int transactionsParallelism) {
this.transactionsParallelism = transactionsParallelism;
return this;
}

public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
syncMode,
Expand All @@ -314,7 +328,8 @@ public SynchronizerConfiguration build() {
downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism);
downloaderParallelism,
transactionsParallelism);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ public void onTransactionsAdded(final Iterable<Transaction> transactions) {
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction)));
ethContext
.getScheduler()
.scheduleWorkerTask(transactionsMessageSender::sendTransactionsToPeers);
.scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public TransactionsMessageHandler(
@Override
public void exec(final EthMessage message) {
final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData());
scheduler.scheduleWorkerTask(
scheduler.scheduleTxWorkerTask(
() ->
transactionsMessageProcessor.processTransactionsMessage(
message.getPeer(), transactionsMessage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@ public class DeterministicEthScheduler extends EthScheduler {
}

DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) {
super(new MockExecutorService(), new MockScheduledExecutor());
super(new MockExecutorService(), new MockScheduledExecutor(), new MockExecutorService());
this.timeoutPolicy = timeoutPolicy;
}

MockExecutorService mockWorkerExecutor() {
return (MockExecutorService) workerExecutor;
MockExecutorService mockSyncWorkerExecutor() {
return (MockExecutorService) syncWorkerExecutor;
}

MockScheduledExecutor mockScheduledExecutor() {
return (MockScheduledExecutor) scheduler;
}

MockScheduledExecutor mockTransactionsExecutor() {
return (MockScheduledExecutor) txWorkerExecutor;
}

@Override
public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Duration timeout) {
if (timeoutPolicy.shouldTimeout()) {
Expand Down
Loading

0 comments on commit abaaef7

Please sign in to comment.