From c0bf672a28595307e045a7878bf326c501d54e97 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Thu, 25 Apr 2019 10:07:11 -0400 Subject: [PATCH] [PAN-1062] Evict old transactions (1 of 2) (#1299) --- .../blockcreation/CliqueBlockCreatorTest.java | 14 +++-- .../CliqueMinerExecutorTest.java | 8 ++- .../ibft/support/TestContextBuilder.java | 8 ++- .../blockcreation/IbftBlockCreatorTest.java | 8 ++- .../blockcreation/IbftBlockCreatorTest.java | 5 +- .../BlockTransactionSelectorTest.java | 30 ++++++++--- .../EthHashBlockCreatorTest.java | 9 +++- .../EthHashMinerExecutorTest.java | 14 ++++- .../eth/transactions/PendingTransactions.java | 22 +++++++- .../transactions/TransactionPoolFactory.java | 7 ++- .../transactions/PendingTransactionsTest.java | 51 ++++++++++++++++++- .../eth/transactions/TransactionPoolTest.java | 6 ++- .../EthGetFilterChangesIntegrationTest.java | 6 ++- .../java/tech/pegasys/pantheon/Runner.java | 7 +++ 14 files changed, 170 insertions(+), 25 deletions(-) diff --git a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java index ac256612e1..1408422e09 100644 --- a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java +++ b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueBlockCreatorTest.java @@ -50,12 +50,14 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.List; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; public class CliqueBlockCreatorTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final KeyPair proposerKeyPair = KeyPair.generate(); private final Address proposerAddress = Util.publicKeyToAddress(proposerKeyPair.getPublicKey()); @@ -113,7 +115,8 @@ public void proposerAddressCanBeExtractFromAConstructedBlock() { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, @@ -140,7 +143,8 @@ public void insertsValidVoteIntoConstructedBlock() { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, @@ -166,7 +170,8 @@ public void insertsNoVoteWhenAuthInValidators() { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, @@ -195,7 +200,8 @@ public void insertsNoVoteWhenAtEpoch() { new CliqueBlockCreator( coinbase, parent -> extraData.encode(), - new PendingTransactions(5, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem), protocolContext, protocolSchedule, gasLimit -> gasLimit, diff --git a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java index 12c22c3c2e..4417eadde6 100644 --- a/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java +++ b/consensus/clique/src/test/java/tech/pegasys/pantheon/consensus/clique/blockcreation/CliqueMinerExecutorTest.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import io.vertx.core.json.JsonObject; @@ -54,6 +55,7 @@ public class CliqueMinerExecutorTest { private static final GenesisConfigOptions GENESIS_CONFIG_OPTIONS = GenesisConfigFile.fromConfig(new JsonObject()).getConfigOptions(); + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final KeyPair proposerKeyPair = KeyPair.generate(); private Address localAddress; private final List
validatorList = Lists.newArrayList(); @@ -90,7 +92,8 @@ public void extraDataCreatedOnEpochBlocksContainsValidators() { cliqueProtocolContext, Executors.newSingleThreadExecutor(), CliqueProtocolSchedule.create(GENESIS_CONFIG_OPTIONS, proposerKeyPair), - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem), proposerKeyPair, new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false), mock(CliqueBlockScheduler.class), @@ -120,7 +123,8 @@ public void extraDataForNonEpochBlocksDoesNotContainValidaors() { cliqueProtocolContext, Executors.newSingleThreadExecutor(), CliqueProtocolSchedule.create(GENESIS_CONFIG_OPTIONS, proposerKeyPair), - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem), proposerKeyPair, new MiningParameters(AddressHelpers.ofValue(1), Wei.ZERO, wrappedVanityData, false), mock(CliqueBlockScheduler.class), diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java index 27ee6b6ff4..646df6675d 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java @@ -80,11 +80,14 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.google.common.collect.Iterables; public class TestContextBuilder { + + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private static MetricsSystem metricsSystem = new NoOpMetricsSystem(); private static class ControllerAndState { @@ -282,10 +285,13 @@ private static ControllerAndState createControllerAndFinalState( new ProtocolContext<>( blockChain, worldStateArchive, new IbftContext(voteTallyCache, voteProposer)); + final PendingTransactions pendingTransactions = + new PendingTransactions(TRANSACTION_EVICTION_INTERVAL_MS, 1, clock, metricsSystem); + final IbftBlockCreatorFactory blockCreatorFactory = new IbftBlockCreatorFactory( (gasLimit) -> gasLimit, - new PendingTransactions(1, clock, metricsSystem), // changed from IbftPantheonController + pendingTransactions, // changed from IbftPantheonController protocolContext, protocolSchedule, miningParams, diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java index bc26d5574c..7f1e209333 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/blockcreation/IbftBlockCreatorTest.java @@ -46,11 +46,13 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.junit.Test; public class IbftBlockCreatorTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test @@ -81,6 +83,10 @@ public void createdBlockPassesValidationRulesAndHasAppropriateHashAndMixHash() { createInMemoryWorldStateArchive(), setupContextWithValidators(initialValidatorList)); + final PendingTransactions pendingTransactions = + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem); + final IbftBlockCreator blockCreator = new IbftBlockCreator( initialValidatorList.get(0), @@ -92,7 +98,7 @@ public void createdBlockPassesValidationRulesAndHasAppropriateHashAndMixHash() { 0, initialValidatorList) .encode(), - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + pendingTransactions, protContext, protocolSchedule, parentGasLimit -> parentGasLimit, diff --git a/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java b/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java index edefe48fdd..4112a9ef96 100644 --- a/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java +++ b/consensus/ibftlegacy/src/test/java/tech/pegasys/pantheon/consensus/ibftlegacy/blockcreation/IbftBlockCreatorTest.java @@ -46,11 +46,13 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.junit.Test; public class IbftBlockCreatorTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test @@ -97,7 +99,8 @@ public void headerProducedPassesValidationRules() { null, initialValidatorList) .encode(), - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem), protContext, protocolSchedule, parentGasLimit -> parentGasLimit, diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java index b32f2046ed..125ce24730 100644 --- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java +++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/BlockTransactionSelectorTest.java @@ -55,6 +55,7 @@ import java.math.BigInteger; import java.time.Instant; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import com.google.common.collect.Lists; @@ -63,6 +64,7 @@ public class BlockTransactionSelectorTest { private static final KeyPair keyPair = KeyPair.generate(); + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test @@ -75,7 +77,9 @@ public void emptyPendingTransactionsResultsInEmptyVettingResult() { final DefaultMutableWorldState worldState = inMemoryWorldState(); final PendingTransactions pendingTransactions = - new PendingTransactions(5, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); + final Supplier isCancelled = () -> false; final ProcessableBlockHeader blockHeader = @@ -113,7 +117,8 @@ public void emptyPendingTransactionsResultsInEmptyVettingResult() { @Test public void failedTransactionsAreIncludedInTheBlock() { final PendingTransactions pendingTransactions = - new PendingTransactions(5, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); final Transaction transaction = createTransaction(1); pendingTransactions.addRemoteTransaction(transaction); @@ -165,7 +170,8 @@ public void failedTransactionsAreIncludedInTheBlock() { @Test public void invalidTransactionsTransactionProcessingAreSkippedButBlockStillFills() { final PendingTransactions pendingTransactions = - new PendingTransactions(5, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); final List transactionsToInject = Lists.newArrayList(); for (int i = 0; i < 5; i++) { @@ -228,7 +234,8 @@ public void invalidTransactionsTransactionProcessingAreSkippedButBlockStillFills @Test public void subsetOfPendingTransactionsIncludedWhenBlockGasLimitHit() { final PendingTransactions pendingTransactions = - new PendingTransactions(5, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); final List transactionsToInject = Lists.newArrayList(); // Transactions are reported in reverse order. @@ -294,7 +301,8 @@ public void subsetOfPendingTransactionsIncludedWhenBlockGasLimitHit() { @Test public void transactionOfferingGasPriceLessThanMinimumIsIdentifiedAndRemovedFromPending() { final PendingTransactions pendingTransactions = - new PendingTransactions(5, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); final Blockchain blockchain = new TestBlockchain(); @@ -339,7 +347,9 @@ public void transactionOfferingGasPriceLessThanMinimumIsIdentifiedAndRemovedFrom @Test public void transactionTooLargeForBlockDoesNotPreventMoreBeingAddedIfBlockOccupancyNotReached() { final PendingTransactions pendingTransactions = - new PendingTransactions(5, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); + final Blockchain blockchain = new TestBlockchain(); final DefaultMutableWorldState worldState = inMemoryWorldState(); final Supplier isCancelled = () -> false; @@ -410,7 +420,9 @@ public void transactionTooLargeForBlockDoesNotPreventMoreBeingAddedIfBlockOccupa @Test public void transactionSelectionStopsWhenSufficientBlockOccupancyIsReached() { final PendingTransactions pendingTransactions = - new PendingTransactions(10, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); + final Blockchain blockchain = new TestBlockchain(); final DefaultMutableWorldState worldState = inMemoryWorldState(); final Supplier isCancelled = () -> false; @@ -492,7 +504,9 @@ public void transactionSelectionStopsWhenSufficientBlockOccupancyIsReached() { @Test public void shouldDiscardTransactionsThatFailValidation() { final PendingTransactions pendingTransactions = - new PendingTransactions(10, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 5, TestClock.fixed(), metricsSystem); + final TransactionProcessor transactionProcessor = mock(TransactionProcessor.class); final Blockchain blockchain = new TestBlockchain(); final DefaultMutableWorldState worldState = inMemoryWorldState(); diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java index 447cf23e61..91e67a7960 100644 --- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java +++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashBlockCreatorTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.math.BigInteger; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.google.common.collect.Lists; @@ -47,6 +48,7 @@ public class EthHashBlockCreatorTest { private static final BytesValue BLOCK_1_EXTRA_DATA = BytesValue.fromHexString("0x476574682f76312e302e302f6c696e75782f676f312e342e32"); + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final ExecutionContextTestFixture executionContextTestFixture = @@ -63,11 +65,16 @@ public class EthHashBlockCreatorTest { @Test public void createMainnetBlock1() throws IOException { final EthHashSolver solver = new EthHashSolver(Lists.newArrayList(BLOCK_1_NONCE), new Light()); + + final PendingTransactions pendingTransactions = + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem); + final EthHashBlockCreator blockCreator = new EthHashBlockCreator( BLOCK_1_COINBASE, parent -> BLOCK_1_EXTRA_DATA, - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + pendingTransactions, executionContextTestFixture.getProtocolContext(), executionContextTestFixture.getProtocolSchedule(), gasLimit -> gasLimit, diff --git a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java index e1be9fca44..e78f6a6974 100644 --- a/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java +++ b/ethereum/blockcreation/src/test/java/tech/pegasys/pantheon/ethereum/blockcreation/EthHashMinerExecutorTest.java @@ -23,10 +23,12 @@ import tech.pegasys.pantheon.util.Subscribers; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Test; public class EthHashMinerExecutorTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); @Test @@ -34,12 +36,16 @@ public void startingMiningWithoutCoinbaseThrowsException() { final MiningParameters miningParameters = new MiningParametersTestBuilder().coinbase(null).build(); + final PendingTransactions pendingTransactions = + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem); + final EthHashMinerExecutor executor = new EthHashMinerExecutor( null, Executors.newCachedThreadPool(), null, - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + pendingTransactions, miningParameters, new DefaultBlockScheduler(1, 10, TestClock.fixed())); @@ -52,12 +58,16 @@ public void startingMiningWithoutCoinbaseThrowsException() { public void settingCoinbaseToNullThrowsException() { final MiningParameters miningParameters = new MiningParametersTestBuilder().build(); + final PendingTransactions pendingTransactions = + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, 1, TestClock.fixed(), metricsSystem); + final EthHashMinerExecutor executor = new EthHashMinerExecutor( null, Executors.newCachedThreadPool(), null, - new PendingTransactions(1, TestClock.fixed(), metricsSystem), + pendingTransactions, miningParameters, new DefaultBlockScheduler(1, 10, TestClock.fixed())); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactions.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactions.java index 90c1fe85b4..37565064e4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactions.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactions.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.transactions; import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.toList; import tech.pegasys.pantheon.ethereum.core.AccountTransactionOrder; import tech.pegasys.pantheon.ethereum.core.Address; @@ -70,8 +71,14 @@ public class PendingTransactions { private final Counter localTransactionAddedCounter; private final Counter remoteTransactionAddedCounter; + private final long transactionEvictionIntervalMs; + public PendingTransactions( - final int maxPendingTransactions, final Clock clock, final MetricsSystem metricsSystem) { + final long transactionEvictionIntervalMs, + final int maxPendingTransactions, + final Clock clock, + final MetricsSystem metricsSystem) { + this.transactionEvictionIntervalMs = transactionEvictionIntervalMs; this.maxPendingTransactions = maxPendingTransactions; this.clock = clock; final LabelledMetric transactionAddedCounter = @@ -92,6 +99,19 @@ public PendingTransactions( "operation"); } + public void evictOldTransactions() { + synchronized (pendingTransactions) { + final Instant removeTransactionsBefore = + clock.instant().minusMillis(transactionEvictionIntervalMs); + final List transactionsToRemove = + prioritizedTransactions.stream() + .filter( + transaction -> transaction.getAddedToPoolAt().isBefore(removeTransactionsBefore)) + .collect(toList()); + transactionsToRemove.forEach(transaction -> removeTransaction(transaction.getTransaction())); + } + } + List getLocalTransactions() { synchronized (pendingTransactions) { List localTransactions = new ArrayList<>(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java index aa86b63432..25acce2541 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java @@ -20,9 +20,12 @@ import tech.pegasys.pantheon.metrics.MetricsSystem; import java.time.Clock; +import java.util.concurrent.TimeUnit; public class TransactionPoolFactory { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.HOURS.toMillis(12); + public static TransactionPool createTransactionPool( final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, @@ -31,8 +34,10 @@ public static TransactionPool createTransactionPool( final int maxPendingTransactions, final MetricsSystem metricsSystem, final SyncState syncState) { + final PendingTransactions pendingTransactions = - new PendingTransactions(maxPendingTransactions, clock, metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, maxPendingTransactions, clock, metricsSystem); final PeerTransactionTracker transactionTracker = new PeerTransactionTracker(); final TransactionsMessageSender transactionsMessageSender = diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactionsTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactionsTest.java index 014b2930c7..7ba4cf3b7d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactionsTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/PendingTransactionsTest.java @@ -32,19 +32,23 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.junit.Test; public class PendingTransactionsTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private static final int MAX_TRANSACTIONS = 5; private static final KeyPair KEYS1 = KeyPair.generate(); private static final KeyPair KEYS2 = KeyPair.generate(); + private final TestClock clock = new TestClock(); private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final PendingTransactions transactions = - new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); private final Transaction transaction1 = createTransaction(2); private final Transaction transaction2 = createTransaction(1); @@ -409,4 +413,49 @@ private Transaction createTransaction(final int transactionNumber) { .nonce(transactionNumber) .createTransaction(KEYS1); } + + @Test + public void shouldEvictMultipleOldTransactions() { + final long transactionEvictionIntervalMs = 1000L; + final PendingTransactions transactions = + new PendingTransactions( + transactionEvictionIntervalMs, MAX_TRANSACTIONS, clock, metricsSystem); + + transactions.addRemoteTransaction(transaction1); + assertThat(transactions.size()).isEqualTo(1); + transactions.addRemoteTransaction(transaction2); + assertThat(transactions.size()).isEqualTo(2); + + clock.stepMillis(2000); + transactions.evictOldTransactions(); + assertThat(transactions.size()).isEqualTo(0); + } + + @Test + public void shouldEvictSingleOldTransaction() { + final long transactionEvictionIntervalMs = 1000L; + final PendingTransactions transactions = + new PendingTransactions( + transactionEvictionIntervalMs, MAX_TRANSACTIONS, clock, metricsSystem); + transactions.addRemoteTransaction(transaction1); + assertThat(transactions.size()).isEqualTo(1); + clock.stepMillis(2000); + transactions.evictOldTransactions(); + assertThat(transactions.size()).isEqualTo(0); + } + + @Test + public void shouldEvictExclusivelyOldTransactions() { + final long transactionEvictionIntervalMs = 2L; + final PendingTransactions transactions = + new PendingTransactions( + transactionEvictionIntervalMs, MAX_TRANSACTIONS, clock, metricsSystem); + transactions.addRemoteTransaction(transaction1); + assertThat(transactions.size()).isEqualTo(1); + clock.stepMillis(2001); + transactions.addRemoteTransaction(transaction2); + assertThat(transactions.size()).isEqualTo(2); + transactions.evictOldTransactions(); + assertThat(transactions.size()).isEqualTo(1); + } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java index e33a4a1ca3..6f98a99a92 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java @@ -67,12 +67,14 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; public class TransactionPoolTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private static final int MAX_TRANSACTIONS = 5; private static final KeyPair KEY_PAIR1 = KeyPair.generate(); @@ -89,8 +91,10 @@ public class TransactionPoolTest { private final TransactionValidator transactionValidator = mock(TransactionValidator.class); private MutableBlockchain blockchain; + private final PendingTransactions transactions = - new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); private final Transaction transaction1 = createTransaction(1); private final Transaction transaction2 = createTransaction(2); private final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create(); diff --git a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java index e4b8734b08..26ff351756 100644 --- a/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java @@ -57,6 +57,7 @@ import java.math.BigInteger; import java.util.List; +import java.util.concurrent.TimeUnit; import org.assertj.core.util.Lists; import org.junit.Before; @@ -68,6 +69,7 @@ @RunWith(MockitoJUnitRunner.class) public class EthGetFilterChangesIntegrationTest { + private static final long TRANSACTION_EVICTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); @Mock private TransactionBatchAddedListener batchAddedListener; private MutableBlockchain blockchain; private final String ETH_METHOD = "eth_getFilterChanges"; @@ -76,7 +78,9 @@ public class EthGetFilterChangesIntegrationTest { private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final PendingTransactions transactions = - new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); + new PendingTransactions( + TRANSACTION_EVICTION_INTERVAL_MS, MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem); + private static final int MAX_TRANSACTIONS = 5; private static final KeyPair keyPair = KeyPair.generate(); private final Transaction transaction = createTransaction(1); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java b/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java index adf0d53299..041648cc49 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java @@ -73,6 +73,13 @@ public void start() { if (networkRunner.getNetwork().isP2pEnabled()) { pantheonController.getSynchronizer().start(); } + vertx.setPeriodic( + TimeUnit.MINUTES.toMillis(1), + time -> + pantheonController + .getTransactionPool() + .getPendingTransactions() + .evictOldTransactions()); jsonRpc.ifPresent(service -> waitForServiceToStart("jsonRpc", service.start())); websocketRpc.ifPresent(service -> waitForServiceToStop("websocketRpc", service.start())); metrics.ifPresent(service -> waitForServiceToStart("metrics", service.start()));