Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Reduce synchronization in PendingTransactions #1447

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;

import tech.pegasys.pantheon.ethereum.core.AccountTransactionOrder;
import tech.pegasys.pantheon.ethereum.core.Address;
Expand All @@ -40,7 +39,9 @@
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
* Holds the current set of pending transactions with the ability to iterate them based on priority
Expand All @@ -56,7 +57,7 @@ public class PendingTransactions {
private final int maxTransactionRetentionHours;
private final Clock clock;

private final Map<Hash, TransactionInfo> pendingTransactions = new HashMap<>();
private final Map<Hash, TransactionInfo> pendingTransactions = new ConcurrentHashMap<>();
private final SortedSet<TransactionInfo> prioritizedTransactions =
new TreeSet<>(
comparing(TransactionInfo::isReceivedFromLocalSource)
Expand Down Expand Up @@ -103,28 +104,19 @@ public PendingTransactions(
}

public void evictOldTransactions() {
synchronized (pendingTransactions) {
final Instant removeTransactionsBefore =
clock.instant().minus(maxTransactionRetentionHours, ChronoUnit.HOURS);
final List<TransactionInfo> transactionsToRemove =
prioritizedTransactions.stream()
.filter(
transaction -> transaction.getAddedToPoolAt().isBefore(removeTransactionsBefore))
.collect(toList());
transactionsToRemove.forEach(transaction -> removeTransaction(transaction.getTransaction()));
}
final Instant removeTransactionsBefore =
clock.instant().minus(maxTransactionRetentionHours, ChronoUnit.HOURS);

pendingTransactions.values().stream()
.filter(transaction -> transaction.getAddedToPoolAt().isBefore(removeTransactionsBefore))
.forEach(transaction -> removeTransaction(transaction.getTransaction()));
Copy link
Contributor

@shemnon shemnon May 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if there was some way to assert that the underlying spliterator had the CONCURRENT characteristic before performing. While this is correct (because it is from a ConcurrentHashMap) I am concerned that future alterations may miss this nuance. At least a comment would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the map ever gets changed back to a non-concurrent map, all the places it's accessed from will be problematic. This is actually the place that kind of mistake will be most obvious because the unit tests will fail with ConcurrentModificationException. The others will just be subtle bugs in the face of multiple threads.

}

List<Transaction> getLocalTransactions() {
synchronized (pendingTransactions) {
List<Transaction> localTransactions = new ArrayList<>();
for (Map.Entry<Hash, TransactionInfo> transaction : pendingTransactions.entrySet()) {
if (transaction.getValue().isReceivedFromLocalSource()) {
localTransactions.add(transaction.getValue().getTransaction());
}
}
return localTransactions;
}
return pendingTransactions.values().stream()
.filter(TransactionInfo::isReceivedFromLocalSource)
.map(TransactionInfo::getTransaction)
.collect(Collectors.toList());
}

public boolean addRemoteTransaction(final Transaction transaction) {
Expand Down Expand Up @@ -220,6 +212,7 @@ private AccountTransactionOrder createSenderTransactionOrder(final Address addre
}

private boolean addTransaction(final TransactionInfo transactionInfo) {
Optional<Transaction> droppedTransaction = Optional.empty();
synchronized (pendingTransactions) {
if (pendingTransactions.containsKey(transactionInfo.getHash())) {
return false;
Expand All @@ -231,13 +224,15 @@ private boolean addTransaction(final TransactionInfo transactionInfo) {
prioritizedTransactions.add(transactionInfo);
pendingTransactions.put(transactionInfo.getHash(), transactionInfo);

notifyTransactionAdded(transactionInfo.getTransaction());
if (pendingTransactions.size() > maxPendingTransactions) {
final TransactionInfo toRemove = prioritizedTransactions.last();
removeTransaction(toRemove.getTransaction());
doRemoveTransaction(toRemove.getTransaction(), false);
droppedTransaction = Optional.of(toRemove.getTransaction());
}
return true;
}
notifyTransactionAdded(transactionInfo.getTransaction());
droppedTransaction.ifPresent(this::notifyTransactionDropped);
return true;
}

private boolean addTransactionForSenderAndNonce(final TransactionInfo transactionInfo) {
Expand Down Expand Up @@ -277,22 +272,16 @@ public long maxSize() {
}

public int size() {
synchronized (pendingTransactions) {
return pendingTransactions.size();
}
return pendingTransactions.size();
}

public Optional<Transaction> getTransactionByHash(final Hash transactionHash) {
synchronized (pendingTransactions) {
return Optional.ofNullable(pendingTransactions.get(transactionHash))
.map(TransactionInfo::getTransaction);
}
return Optional.ofNullable(pendingTransactions.get(transactionHash))
.map(TransactionInfo::getTransaction);
}

public Set<TransactionInfo> getTransactionInfo() {
synchronized (pendingTransactions) {
return new HashSet<>(pendingTransactions.values());
}
return new HashSet<>(pendingTransactions.values());
}

void addTransactionListener(final PendingTransactionListener listener) {
Expand Down