Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of blobs that are part of multiple transactions #7723

Merged
merged 10 commits into from
Oct 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
Expand Down Expand Up @@ -107,8 +108,8 @@ public class TransactionPool implements BlockAddedObserver {
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;
private final Map<VersionedHash, BlobsWithCommitments.BlobQuad> mapOfBlobsInTransactionPool =
new HashMap<>();
private final Map<VersionedHash, List<BlobsWithCommitments.BlobQuad>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need to support concurrency, since notifications are sent async by potentially multiple threads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The access to the map was synchronized, but using a map implementation that does the synchronization for me is probably a better idea!

mapOfBlobsInTransactionPool = new HashMap<>();

public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
Expand Down Expand Up @@ -660,7 +661,14 @@ private void mapBlobsOnTransactionAdded(
}
final List<BlobsWithCommitments.BlobQuad> blobQuads =
maybeBlobsWithCommitments.get().getBlobQuads();
blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.put(bq.versionedHash(), bq));

synchronized (mapOfBlobsInTransactionPool) {
blobQuads.forEach(
bq ->
mapOfBlobsInTransactionPool
.computeIfAbsent(bq.versionedHash(), k -> new ArrayList<>())
.add(bq));
}
}

private void unmapBlobsOnTransactionDropped(
Expand All @@ -672,15 +680,31 @@ private void unmapBlobsOnTransactionDropped(
}
final List<BlobsWithCommitments.BlobQuad> blobQuads =
maybeBlobsWithCommitments.get().getBlobQuads();
blobQuads.forEach(bq -> mapOfBlobsInTransactionPool.remove(bq.versionedHash()));

synchronized (mapOfBlobsInTransactionPool) {
blobQuads.forEach(
bq -> {
final List<BlobsWithCommitments.BlobQuad> blobQuadList =
mapOfBlobsInTransactionPool.get(bq.versionedHash());
blobQuadList.remove(bq);
if (blobQuadList.isEmpty()) {
mapOfBlobsInTransactionPool.remove(bq.versionedHash());
}
});
}
}

public BlobsWithCommitments.BlobQuad getBlobQuad(final VersionedHash vh) {
BlobsWithCommitments.BlobQuad blobQuad = mapOfBlobsInTransactionPool.get(vh);
if (blobQuad == null) {
blobQuad = cacheForBlobsOfTransactionsAddedToABlock.get(vh);
final List<BlobsWithCommitments.BlobQuad> blobQuadList;
blobQuadList = mapOfBlobsInTransactionPool.get(vh);
if (blobQuadList != null) {
try {
return blobQuadList.getFirst();
} catch (NoSuchElementException e) {
return null;
}
}
return blobQuad;
return cacheForBlobsOfTransactionsAddedToABlock.get(vh);
}

public boolean isEnabled() {
Expand Down
Loading