diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj
index b53342f8ce..5ed8369b8c 100644
--- a/Builds/VisualStudio/stellar-core.vcxproj
+++ b/Builds/VisualStudio/stellar-core.vcxproj
@@ -539,6 +539,7 @@ exit /b 0
+
@@ -684,6 +685,8 @@ exit /b 0
+
+
@@ -996,6 +999,7 @@ exit /b 0
+
@@ -1104,6 +1108,8 @@ exit /b 0
+
+
diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters
index 6bb795c159..fd2f163e59 100644
--- a/Builds/VisualStudio/stellar-core.vcxproj.filters
+++ b/Builds/VisualStudio/stellar-core.vcxproj.filters
@@ -1377,6 +1377,15 @@
ledger
+
+ herder
+
+
+ simulation
+
+
+ simulation
+
@@ -2402,6 +2411,15 @@
main
+
+ herder
+
+
+ simulation
+
+
+ simulation
+
ledger
diff --git a/configure.ac b/configure.ac
index 1ec808f239..5f183ceca9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -32,7 +32,7 @@ if test -z "${WFLAGS+set}"; then
WFLAGS="$WFLAGS -Werror=unused-result"
fi
-test "${CFLAGS+set}" || CFLAGS="-g -O2 -fno-omit-frame-pointer"
+test "${CFLAGS+set}" || CFLAGS="-g -fno-omit-frame-pointer"
test "${CXXFLAGS+set}" || CXXFLAGS="$CFLAGS"
AC_PROG_CC([clang gcc cc])
@@ -115,6 +115,16 @@ AS_IF([test "x$enable_codecoverage" = "xyes"], [
CFLAGS="$CFLAGS -fprofile-instr-generate -fcoverage-mapping"
])
+AC_ARG_ENABLE([debugmode],
+ AS_HELP_STRING([--enable-debugmode],
+ [build in debug mode]))
+
+AS_IF([test "x$enable_debugmode" != "xyes"], [
+ AC_MSG_NOTICE([ adding -O2 optimization flags ])
+ CXXFLAGS="$CXXFLAGS -O2"
+ CFLAGS="$CFLAGS -O2"
+])
+
AC_ARG_ENABLE([threadsanitizer],
AS_HELP_STRING([--enable-threadsanitizer],
[build with thread-sanitizer (TSan) instrumentation]))
diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp
index e3e1eeb601..6e8c137e86 100644
--- a/src/herder/HerderImpl.cpp
+++ b/src/herder/HerderImpl.cpp
@@ -1350,7 +1350,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
// our first choice for this round's set is all the tx we have collected
// during last few ledger closes
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
- TxSetPhaseTransactions txPhases;
+ PerPhaseTransactionList txPhases;
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header));
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
@@ -1415,7 +1415,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
upperBoundCloseTimeOffset = nextCloseTime - lcl.header.scpValue.closeTime;
lowerBoundCloseTimeOffset = upperBoundCloseTimeOffset;
- TxSetPhaseTransactions invalidTxPhases;
+ PerPhaseTransactionList invalidTxPhases;
invalidTxPhases.resize(txPhases.size());
auto [proposedSet, applicableProposedSet] =
diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp
new file mode 100644
index 0000000000..ba418ae9b4
--- /dev/null
+++ b/src/herder/ParallelTxSetBuilder.cpp
@@ -0,0 +1,379 @@
+// Copyright 2024 Stellar Development Foundation and contributors. Licensed
+// under the Apache License, Version 2.0. See the COPYING file at the root
+// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
+
+#include "herder/ParallelTxSetBuilder.h"
+#include "herder/SurgePricingUtils.h"
+#include "herder/TxSetFrame.h"
+#include "transactions/TransactionFrameBase.h"
+#include "util/BitSet.h"
+
+#include
+
+namespace stellar
+{
+namespace
+{
+
+struct ParallelPartitionConfig
+{
+ ParallelPartitionConfig(Config const& cfg,
+ SorobanNetworkConfig const& sorobanCfg)
+ : mStageCount(
+ std::max(cfg.SOROBAN_PHASE_STAGE_COUNT, static_cast(1)))
+ , mThreadsPerStage(sorobanCfg.ledgerMaxParallelThreads())
+ , mInstructionsPerThread(sorobanCfg.ledgerMaxInstructions() /
+ mStageCount)
+ {
+ }
+
+ uint64_t
+ instructionsPerStage() const
+ {
+ return mInstructionsPerThread * mThreadsPerStage;
+ }
+
+ uint32_t mStageCount = 0;
+ uint32_t mThreadsPerStage = 0;
+ uint64_t mInstructionsPerThread = 0;
+};
+
+struct BuilderTx
+{
+ size_t mId = 0;
+ uint32_t mInstructions = 0;
+ BitSet mReadOnlyFootprint;
+ BitSet mReadWriteFootprint;
+
+ BuilderTx(size_t txId, TransactionFrameBase const& tx,
+ UnorderedMap const& entryIdMap)
+ : mId(txId), mInstructions(tx.sorobanResources().instructions)
+ {
+ auto const& footprint = tx.sorobanResources().footprint;
+ for (auto const& key : footprint.readOnly)
+ {
+ mReadOnlyFootprint.set(entryIdMap.at(key));
+ }
+ for (auto const& key : footprint.readWrite)
+ {
+ mReadWriteFootprint.set(entryIdMap.at(key));
+ }
+ }
+};
+
+struct Cluster
+{
+ uint64_t mInstructions = 0;
+ BitSet mReadOnlyEntries;
+ BitSet mReadWriteEntries;
+ BitSet mTxIds;
+ size_t mBinId = 0;
+
+ explicit Cluster(BuilderTx const& tx) : mInstructions(tx.mInstructions)
+ {
+ mReadOnlyEntries.inplaceUnion(tx.mReadOnlyFootprint);
+ mReadWriteEntries.inplaceUnion(tx.mReadWriteFootprint);
+ mTxIds.set(tx.mId);
+ }
+
+ void
+ merge(Cluster const& other)
+ {
+ mInstructions += other.mInstructions;
+ mReadOnlyEntries.inplaceUnion(other.mReadOnlyEntries);
+ mReadWriteEntries.inplaceUnion(other.mReadWriteEntries);
+ mTxIds.inplaceUnion(other.mTxIds);
+ }
+};
+
+class Stage
+{
+ public:
+ Stage(ParallelPartitionConfig cfg) : mConfig(cfg)
+ {
+ mBinPacking.resize(mConfig.mThreadsPerStage);
+ mBinInstructions.resize(mConfig.mThreadsPerStage);
+ }
+
+ bool
+ tryAdd(BuilderTx const& tx)
+ {
+ ZoneScoped;
+ if (mInstructions + tx.mInstructions > mConfig.instructionsPerStage())
+ {
+ return false;
+ }
+
+ auto conflictingClusters = getConflictingClusters(tx);
+
+ bool packed = false;
+ auto newClusters = createNewClusters(tx, conflictingClusters, packed);
+ releaseAssert(!newClusters.empty());
+ if (newClusters.back().mInstructions > mConfig.mInstructionsPerThread)
+ {
+ return false;
+ }
+ if (packed)
+ {
+ mClusters = newClusters;
+ mInstructions += tx.mInstructions;
+ return true;
+ }
+
+ std::vector newBinInstructions;
+ auto newPacking = binPacking(newClusters, newBinInstructions);
+ if (newPacking.empty())
+ {
+ return false;
+ }
+ mClusters = newClusters;
+ mBinPacking = newPacking;
+ mInstructions += tx.mInstructions;
+ mBinInstructions = newBinInstructions;
+ return true;
+ }
+
+ void
+ visitAllTransactions(std::function visitor) const
+ {
+ for (auto const& cluster : mClusters)
+ {
+ size_t txId = 0;
+ while (cluster.mTxIds.nextSet(txId))
+ {
+ visitor(cluster.mBinId, txId);
+ ++txId;
+ }
+ }
+ }
+
+ private:
+ std::unordered_set
+ getConflictingClusters(BuilderTx const& tx) const
+ {
+ std::unordered_set conflictingClusters;
+ for (Cluster const& cluster : mClusters)
+ {
+ bool isConflicting = tx.mReadOnlyFootprint.intersectionCount(
+ cluster.mReadWriteEntries) > 0 ||
+ tx.mReadWriteFootprint.intersectionCount(
+ cluster.mReadOnlyEntries) > 0 ||
+ tx.mReadWriteFootprint.intersectionCount(
+ cluster.mReadWriteEntries) > 0;
+ if (isConflicting)
+ {
+ conflictingClusters.insert(&cluster);
+ }
+ }
+ return conflictingClusters;
+ }
+
+ std::vector
+ createNewClusters(BuilderTx const& tx,
+ std::unordered_set const& txConflicts,
+ bool& packed)
+ {
+ std::vector newClusters;
+ newClusters.reserve(mClusters.size());
+ for (auto const& cluster : mClusters)
+ {
+ if (txConflicts.find(&cluster) == txConflicts.end())
+ {
+ newClusters.push_back(cluster);
+ }
+ }
+
+ newClusters.emplace_back(tx);
+ for (auto const* cluster : txConflicts)
+ {
+ newClusters.back().merge(*cluster);
+ }
+
+ if (newClusters.back().mInstructions > mConfig.mInstructionsPerThread)
+ {
+ return newClusters;
+ }
+
+ for (auto const& cluster : txConflicts)
+ {
+ mBinInstructions[cluster->mBinId] -= cluster->mInstructions;
+ mBinPacking[cluster->mBinId].inplaceDifference(cluster->mTxIds);
+ }
+
+ packed = false;
+
+ for (size_t binId = 0; binId < mConfig.mThreadsPerStage; ++binId)
+ {
+ if (mBinInstructions[binId] + newClusters.back().mInstructions <=
+ mConfig.mInstructionsPerThread)
+ {
+ mBinInstructions[binId] += newClusters.back().mInstructions;
+ mBinPacking[binId].inplaceUnion(newClusters.back().mTxIds);
+ newClusters.back().mBinId = binId;
+ packed = true;
+ break;
+ }
+ }
+ if (!packed)
+ {
+ for (auto const& cluster : txConflicts)
+ {
+ mBinInstructions[cluster->mBinId] += cluster->mInstructions;
+ mBinPacking[cluster->mBinId].inplaceUnion(cluster->mTxIds);
+ }
+ }
+ return newClusters;
+ }
+
+ std::vector
+ binPacking(std::vector& clusters,
+ std::vector& binInsns) const
+ {
+ std::sort(clusters.begin(), clusters.end(),
+ [](auto const& a, auto const& b) {
+ return a.mInstructions > b.mInstructions;
+ });
+ size_t const binCount = mConfig.mThreadsPerStage;
+ std::vector bins(binCount);
+ binInsns.resize(binCount);
+ for (auto& cluster : clusters)
+ {
+ bool packed = false;
+ for (size_t i = 0; i < binCount; ++i)
+ {
+ if (binInsns[i] + cluster.mInstructions <=
+ mConfig.mInstructionsPerThread)
+ {
+ binInsns[i] += cluster.mInstructions;
+ bins[i].inplaceUnion(cluster.mTxIds);
+ cluster.mBinId = i;
+ packed = true;
+ break;
+ }
+ }
+ if (!packed)
+ {
+ return std::vector();
+ }
+ }
+ return bins;
+ }
+
+ std::vector mClusters;
+ std::vector mBinPacking;
+ std::vector mBinInstructions;
+ int64_t mInstructions = 0;
+ ParallelPartitionConfig mConfig;
+};
+
+} // namespace
+
+TxStageFrameList
+buildSurgePricedParallelSorobanPhase(
+ TxFrameList const& txFrames, Config const& cfg,
+ SorobanNetworkConfig const& sorobanCfg,
+ std::shared_ptr laneConfig,
+ std::vector& hadTxNotFittingLane)
+{
+ ZoneScoped;
+ UnorderedMap entryIdMap;
+
+ auto addToMap = [&entryIdMap](LedgerKey const& key) {
+ auto sz = entryIdMap.size();
+ entryIdMap.emplace(key, sz);
+ };
+ for (auto const& txFrame : txFrames)
+ {
+ auto const& footprint = txFrame->sorobanResources().footprint;
+ for (auto const& key : footprint.readOnly)
+ {
+ addToMap(key);
+ }
+ for (auto const& key : footprint.readWrite)
+ {
+ addToMap(key);
+ }
+ }
+
+ std::unordered_map builderTxForTx;
+ for (size_t i = 0; i < txFrames.size(); ++i)
+ {
+ auto const& txFrame = txFrames[i];
+ builderTxForTx.emplace(txFrame, BuilderTx(i, *txFrame, entryIdMap));
+ }
+
+ SurgePricingPriorityQueue queue(
+ /* isHighestPriority */ true, laneConfig,
+ stellar::rand_uniform(0, std::numeric_limits::max()));
+ for (auto const& tx : txFrames)
+ {
+ queue.add(tx);
+ }
+
+ ParallelPartitionConfig partitionCfg(cfg, sorobanCfg);
+ std::vector stages(partitionCfg.mStageCount, partitionCfg);
+
+ auto visitor = [&stages,
+ &builderTxForTx](TransactionFrameBaseConstPtr const& tx) {
+ bool added = false;
+ auto builderTxIt = builderTxForTx.find(tx);
+ releaseAssert(builderTxIt != builderTxForTx.end());
+ for (auto& stage : stages)
+ {
+ if (stage.tryAdd(builderTxIt->second))
+ {
+ added = true;
+ break;
+ }
+ }
+ if (added)
+ {
+ return SurgePricingPriorityQueue::VisitTxResult::PROCESSED;
+ }
+ return SurgePricingPriorityQueue::VisitTxResult::REJECTED;
+ };
+
+ std::vector laneLeftUntilLimit;
+ queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimit,
+ hadTxNotFittingLane);
+ releaseAssert(hadTxNotFittingLane.size() == 1);
+
+ TxStageFrameList resStages;
+ resStages.reserve(stages.size());
+ for (auto const& stage : stages)
+ {
+ auto& resStage = resStages.emplace_back();
+ resStage.reserve(partitionCfg.mThreadsPerStage);
+
+ std::unordered_map threadIdToStageThread;
+
+ stage.visitAllTransactions([&resStage, &txFrames,
+ &threadIdToStageThread](size_t threadId,
+ size_t txId) {
+ auto it = threadIdToStageThread.find(threadId);
+ if (it == threadIdToStageThread.end())
+ {
+ it = threadIdToStageThread.emplace(threadId, resStage.size())
+ .first;
+ resStage.emplace_back();
+ }
+ resStage[it->second].push_back(txFrames[txId]);
+ });
+ for (auto const& thread : resStage)
+ {
+ releaseAssert(!thread.empty());
+ }
+ }
+ while (!resStages.empty() && resStages.back().empty())
+ {
+ resStages.pop_back();
+ }
+ for (auto const& stage : resStages)
+ {
+ releaseAssert(!stage.empty());
+ }
+
+ return resStages;
+}
+
+} // namespace stellar
diff --git a/src/herder/ParallelTxSetBuilder.h b/src/herder/ParallelTxSetBuilder.h
new file mode 100644
index 0000000000..e97d58767e
--- /dev/null
+++ b/src/herder/ParallelTxSetBuilder.h
@@ -0,0 +1,21 @@
+#pragma once
+
+// Copyright 2024 Stellar Development Foundation and contributors. Licensed
+// under the Apache License, Version 2.0. See the COPYING file at the root
+// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
+
+#include "herder/SurgePricingUtils.h"
+#include "herder/TxSetFrame.h"
+#include "ledger/NetworkConfig.h"
+#include "main/Config.h"
+
+namespace stellar
+{
+
+TxStageFrameList buildSurgePricedParallelSorobanPhase(
+ TxFrameList const& txFrames, Config const& cfg,
+ SorobanNetworkConfig const& sorobanCfg,
+ std::shared_ptr laneConfig,
+ std::vector& hadTxNotFittingLane);
+
+} // namespace stellar
diff --git a/src/herder/SurgePricingUtils.cpp b/src/herder/SurgePricingUtils.cpp
index cc12e6e403..68c315bb29 100644
--- a/src/herder/SurgePricingUtils.cpp
+++ b/src/herder/SurgePricingUtils.cpp
@@ -319,6 +319,13 @@ SurgePricingPriorityQueue::popTopTxs(
laneLeftUntilLimit[lane] -= res;
}
}
+ else if (visitRes == VisitTxResult::REJECTED)
+ {
+ // If a transaction hasn't been processed, then it is considered to
+ // be not fitting the lane.
+ hadTxNotFittingLane[GENERIC_LANE] = true;
+ hadTxNotFittingLane[lane] = true;
+ }
erase(currIt);
}
}
diff --git a/src/herder/SurgePricingUtils.h b/src/herder/SurgePricingUtils.h
index 08473e43a8..c8c96d659c 100644
--- a/src/herder/SurgePricingUtils.h
+++ b/src/herder/SurgePricingUtils.h
@@ -133,6 +133,9 @@ class SurgePricingPriorityQueue
// Transaction should be skipped and not counted towards the lane
// limits.
SKIPPED,
+ // Like `SKIPPED`, but marks the fact that the transaction didn't fit
+ // into the lane due to reasons beyond the lane's resource limit.
+ REJECTED,
// Transaction has been processed and should be counted towards the
// lane limits.
PROCESSED
@@ -184,6 +187,17 @@ class SurgePricingPriorityQueue
std::vector>& txsToEvict)
const;
+ // Generalized method for visiting and popping the top transactions in the
+ // queue until the lane limits are reached.
+ // This is a destructive method that removes all or most of the queue
+ // elements and thus should be used with care.
+ void popTopTxs(
+ bool allowGaps,
+ std::function const&
+ visitor,
+ std::vector& laneResourcesLeftUntilLimit,
+ std::vector& hadTxNotFittingLane);
+
private:
class TxComparator
{
@@ -236,17 +250,6 @@ class SurgePricingPriorityQueue
std::vector mutable mIters;
};
- // Generalized method for visiting and popping the top transactions in the
- // queue until the lane limits are reached.
- // This is a destructive method that removes all or most of the queue
- // elements and thus should be used with care.
- void popTopTxs(
- bool allowGaps,
- std::function const&
- visitor,
- std::vector& laneResourcesLeftUntilLimit,
- std::vector& hadTxNotFittingLane);
-
void erase(Iterator const& it);
void erase(size_t lane,
SurgePricingPriorityQueue::TxSortedSet::iterator iter);
diff --git a/src/herder/TransactionQueue.cpp b/src/herder/TransactionQueue.cpp
index d763262e3a..394de13485 100644
--- a/src/herder/TransactionQueue.cpp
+++ b/src/herder/TransactionQueue.cpp
@@ -937,11 +937,11 @@ TransactionQueue::isBanned(Hash const& hash) const
});
}
-TxSetTransactions
+TxFrameList
TransactionQueue::getTransactions(LedgerHeader const& lcl) const
{
ZoneScoped;
- TxSetTransactions txs;
+ TxFrameList txs;
uint32_t const nextLedgerSeq = lcl.ledgerSeq + 1;
int64_t const startingSeq = getStartingSequenceNumber(nextLedgerSeq);
diff --git a/src/herder/TransactionQueue.h b/src/herder/TransactionQueue.h
index 86cd2c97fd..81b2409853 100644
--- a/src/herder/TransactionQueue.h
+++ b/src/herder/TransactionQueue.h
@@ -143,7 +143,7 @@ class TransactionQueue
bool isBanned(Hash const& hash) const;
TransactionFrameBaseConstPtr getTx(Hash const& hash) const;
- TxSetTransactions getTransactions(LedgerHeader const& lcl) const;
+ TxFrameList getTransactions(LedgerHeader const& lcl) const;
bool sourceAccountPending(AccountID const& accountID) const;
virtual size_t getMaxQueueSizeOps() const = 0;
diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp
index 73e705ff93..834eb90e8d 100644
--- a/src/herder/TxSetFrame.cpp
+++ b/src/herder/TxSetFrame.cpp
@@ -9,6 +9,7 @@
#include "crypto/Random.h"
#include "crypto/SHA.h"
#include "database/Database.h"
+#include "herder/ParallelTxSetBuilder.h"
#include "herder/SurgePricingUtils.h"
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
@@ -37,17 +38,111 @@ namespace stellar
namespace
{
+std::string
+getTxSetPhaseName(TxSetPhase phase)
+{
+ switch (phase)
+ {
+ case TxSetPhase::CLASSIC:
+ return "classic";
+ case TxSetPhase::SOROBAN:
+ return "soroban";
+ default:
+ throw std::runtime_error("Unknown phase");
+ }
+}
+
+bool
+validateSequentialPhaseXDRStructure(TransactionPhase const& phase)
+{
+ bool componentsNormalized =
+ std::is_sorted(phase.v0Components().begin(), phase.v0Components().end(),
+ [](auto const& c1, auto const& c2) {
+ if (!c1.txsMaybeDiscountedFee().baseFee ||
+ !c2.txsMaybeDiscountedFee().baseFee)
+ {
+ return !c1.txsMaybeDiscountedFee().baseFee &&
+ c2.txsMaybeDiscountedFee().baseFee;
+ }
+ return *c1.txsMaybeDiscountedFee().baseFee <
+ *c2.txsMaybeDiscountedFee().baseFee;
+ });
+ if (!componentsNormalized)
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: incorrect component order");
+ return false;
+ }
+
+ bool componentBaseFeesUnique =
+ std::adjacent_find(phase.v0Components().begin(),
+ phase.v0Components().end(),
+ [](auto const& c1, auto const& c2) {
+ if (!c1.txsMaybeDiscountedFee().baseFee ||
+ !c2.txsMaybeDiscountedFee().baseFee)
+ {
+ return !c1.txsMaybeDiscountedFee().baseFee &&
+ !c2.txsMaybeDiscountedFee().baseFee;
+ }
+ return *c1.txsMaybeDiscountedFee().baseFee ==
+ *c2.txsMaybeDiscountedFee().baseFee;
+ }) == phase.v0Components().end();
+ if (!componentBaseFeesUnique)
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: duplicate component base fees");
+ return false;
+ }
+ for (auto const& component : phase.v0Components())
+ {
+ if (component.txsMaybeDiscountedFee().txs.empty())
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: empty component");
+ return false;
+ }
+ }
+ return true;
+}
+
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+bool
+validateParallelComponent(ParallelTxsComponent const& component)
+{
+ for (auto const& stage : component.executionStages)
+ {
+ if (stage.empty())
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: empty stage");
+ return false;
+ }
+ for (auto const& thread : stage)
+ {
+ if (thread.empty())
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: empty thread");
+ return false;
+ }
+ }
+ }
+ return true;
+}
+#endif
+
bool
validateTxSetXDRStructure(GeneralizedTransactionSet const& txSet)
{
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ int const MAX_PHASE = 1;
+#else
+ int const MAX_PHASE = 0;
+#endif
if (txSet.v() != 1)
{
CLOG_DEBUG(Herder, "Got bad txSet: unsupported version {}", txSet.v());
return false;
}
+ auto phaseCount = static_cast(TxSetPhase::PHASE_COUNT);
auto const& txSetV1 = txSet.v1TxSet();
// There was no protocol with 1 phase, so checking for 2 phases only
- if (txSetV1.phases.size() != static_cast(TxSetPhase::PHASE_COUNT))
+ if (txSetV1.phases.size() != phaseCount)
{
CLOG_DEBUG(Herder,
"Got bad txSet: exactly 2 phases are expected, got {}",
@@ -55,62 +150,42 @@ validateTxSetXDRStructure(GeneralizedTransactionSet const& txSet)
return false;
}
- for (auto const& phase : txSetV1.phases)
+ for (size_t phaseId = 0; phaseId < phaseCount; ++phaseId)
{
- if (phase.v() != 0)
+ auto const& phase = txSetV1.phases[phaseId];
+ if (phase.v() > MAX_PHASE)
{
CLOG_DEBUG(Herder, "Got bad txSet: unsupported phase version {}",
phase.v());
return false;
}
-
- bool componentsNormalized = std::is_sorted(
- phase.v0Components().begin(), phase.v0Components().end(),
- [](auto const& c1, auto const& c2) {
- if (!c1.txsMaybeDiscountedFee().baseFee ||
- !c2.txsMaybeDiscountedFee().baseFee)
- {
- return !c1.txsMaybeDiscountedFee().baseFee &&
- c2.txsMaybeDiscountedFee().baseFee;
- }
- return *c1.txsMaybeDiscountedFee().baseFee <
- *c2.txsMaybeDiscountedFee().baseFee;
- });
- if (!componentsNormalized)
- {
- CLOG_DEBUG(Herder, "Got bad txSet: incorrect component order");
- return false;
- }
-
- bool componentBaseFeesUnique =
- std::adjacent_find(
- phase.v0Components().begin(), phase.v0Components().end(),
- [](auto const& c1, auto const& c2) {
- if (!c1.txsMaybeDiscountedFee().baseFee ||
- !c2.txsMaybeDiscountedFee().baseFee)
- {
- return !c1.txsMaybeDiscountedFee().baseFee &&
- !c2.txsMaybeDiscountedFee().baseFee;
- }
- return *c1.txsMaybeDiscountedFee().baseFee ==
- *c2.txsMaybeDiscountedFee().baseFee;
- }) == phase.v0Components().end();
- if (!componentBaseFeesUnique)
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ if (phase.v() == 1)
{
- CLOG_DEBUG(Herder, "Got bad txSet: duplicate component base fees");
- return false;
+ if (phaseId != static_cast(TxSetPhase::SOROBAN))
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: non-Soroban parallel phase {}",
+ phase.v());
+ return false;
+ }
+ if (!validateParallelComponent(phase.parallelTxsComponent()))
+ {
+ return false;
+ }
}
- for (auto const& component : phase.v0Components())
+ else
+#endif
{
- if (component.txsMaybeDiscountedFee().txs.empty())
+ if (!validateSequentialPhaseXDRStructure(phase))
{
- CLOG_DEBUG(Herder, "Got bad txSet: empty component");
return false;
}
}
}
return true;
}
+
// We want to XOR the tx hash with the set hash.
// This way people can't predict the order that txs will be applied in
struct ApplyTxSorter
@@ -124,14 +199,14 @@ struct ApplyTxSorter
operator()(TransactionFrameBasePtr const& tx1,
TransactionFrameBasePtr const& tx2) const
{
- // need to use the hash of whole tx here since multiple txs could have
- // the same Contents
+ // need to use the hash of whole tx here since multiple txs could
+ // have the same Contents
return lessThanXored(tx1->getFullHash(), tx2->getFullHash(), mSetHash);
}
};
Hash
-computeNonGenericTxSetContentsHash(TransactionSet const& xdrTxSet)
+computeNonGeneralizedTxSetContentsHash(TransactionSet const& xdrTxSet)
{
ZoneScoped;
SHA256 hasher;
@@ -143,8 +218,8 @@ computeNonGenericTxSetContentsHash(TransactionSet const& xdrTxSet)
return hasher.finish();
}
-// Note: Soroban txs also use this functionality for simplicity, as it's a no-op
-// (all Soroban txs have 1 op max)
+// Note: Soroban txs also use this functionality for simplicity, as it's a
+// no-op (all Soroban txs have 1 op max)
int64_t
computePerOpFee(TransactionFrameBase const& tx, uint32_t ledgerVersion)
{
@@ -158,7 +233,7 @@ computePerOpFee(TransactionFrameBase const& tx, uint32_t ledgerVersion)
}
void
-transactionsToTransactionSetXDR(TxSetTransactions const& txs,
+transactionsToTransactionSetXDR(TxFrameList const& txs,
Hash const& previousLedgerHash,
TransactionSet& txSet)
{
@@ -172,159 +247,536 @@ transactionsToTransactionSetXDR(TxSetTransactions const& txs,
txSet.previousLedgerHash = previousLedgerHash;
}
+void
+sequentialPhaseToXdr(TxFrameList const& txs,
+ InclusionFeeMap const& inclusionFeeMap,
+ TransactionPhase& xdrPhase)
+{
+ xdrPhase.v(0);
+
+ std::map, size_t> feeTxCount;
+ for (auto const& [_, fee] : inclusionFeeMap)
+ {
+ ++feeTxCount[fee];
+ }
+ auto& components = xdrPhase.v0Components();
+ // Reserve a component per unique base fee in order to have the correct
+ // pointers in componentPerBid map.
+ components.reserve(feeTxCount.size());
+
+ std::map, xdr::xvector*>
+ componentPerBid;
+ for (auto const& [fee, txCount] : feeTxCount)
+ {
+ components.emplace_back(TXSET_COMP_TXS_MAYBE_DISCOUNTED_FEE);
+ auto& discountedFeeComponent =
+ components.back().txsMaybeDiscountedFee();
+ if (fee)
+ {
+ discountedFeeComponent.baseFee.activate() = *fee;
+ }
+ componentPerBid[fee] = &discountedFeeComponent.txs;
+ componentPerBid[fee]->reserve(txCount);
+ }
+ auto sortedTxs = TxSetUtils::sortTxsInHashOrder(txs);
+ for (auto const& tx : sortedTxs)
+ {
+ componentPerBid[inclusionFeeMap.find(tx)->second]->push_back(
+ tx->getEnvelope());
+ }
+}
+
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+void
+parallelPhaseToXdr(TxStageFrameList const& txs,
+ InclusionFeeMap const& inclusionFeeMap,
+ TransactionPhase& xdrPhase)
+{
+ xdrPhase.v(1);
+
+ std::optional baseFee;
+ if (!inclusionFeeMap.empty())
+ {
+ baseFee = inclusionFeeMap.begin()->second;
+ }
+ // We currently don't support multi-component parallel perPhaseTxs, so make
+ // sure all txs have the same base fee.
+ for (auto const& [_, fee] : inclusionFeeMap)
+ {
+ releaseAssert(fee == baseFee);
+ }
+ auto& component = xdrPhase.parallelTxsComponent();
+ if (baseFee)
+ {
+ component.baseFee.activate() = *baseFee;
+ }
+ component.executionStages.reserve(txs.size());
+ auto sortedTxs = TxSetUtils::sortParallelTxsInHashOrder(txs);
+ for (auto const& stage : sortedTxs)
+ {
+ auto& xdrStage = component.executionStages.emplace_back();
+ xdrStage.reserve(stage.size());
+ for (auto const& thread : stage)
+ {
+ auto& xdrThread = xdrStage.emplace_back();
+ xdrThread.reserve(thread.size());
+ for (auto const& tx : thread)
+ {
+ xdrThread.push_back(tx->getEnvelope());
+ }
+ }
+ }
+}
+
+#endif
+
void
transactionsToGeneralizedTransactionSetXDR(
- TxSetPhaseTransactions const& phaseTxs,
- std::vector>> const&
- phaseInclusionFeeMap,
- Hash const& previousLedgerHash, GeneralizedTransactionSet& generalizedTxSet)
+ std::vector const& phases, Hash const& previousLedgerHash,
+ GeneralizedTransactionSet& generalizedTxSet)
{
ZoneScoped;
- releaseAssert(phaseTxs.size() == phaseInclusionFeeMap.size());
generalizedTxSet.v(1);
generalizedTxSet.v1TxSet().previousLedgerHash = previousLedgerHash;
-
- for (int i = 0; i < phaseTxs.size(); ++i)
+ generalizedTxSet.v1TxSet().phases.resize(phases.size());
+ for (int i = 0; i < phases.size(); ++i)
{
- auto const& txPhase = phaseTxs[i];
- auto& phase =
- generalizedTxSet.v1TxSet().phases.emplace_back().v0Components();
+ auto const& txPhase = phases[i];
+ txPhase.toXDR(generalizedTxSet.v1TxSet().phases[i]);
+ }
+}
- auto const& feeMap = phaseInclusionFeeMap[i];
- std::map, size_t> feeTxCount;
- for (auto const& [tx, fee] : feeMap)
- {
- ++feeTxCount[fee];
- }
- // Reserve a component per unique base fee in order to have the correct
- // pointers in componentPerBid map.
- phase.reserve(feeTxCount.size());
+TxFrameList
+sortedForApplySequential(TxFrameList const& txs, Hash const& txSetHash)
+{
+ TxFrameList retList;
+ retList.reserve(txs.size());
- std::map, xdr::xvector*>
- componentPerBid;
- for (auto const& [fee, txCount] : feeTxCount)
+ auto txQueues = TxSetUtils::buildAccountTxQueues(txs);
+
+ // build txBatches
+ // txBatches i-th element contains each i-th transaction for
+ // accounts with a transaction in the transaction set
+ std::vector> txBatches;
+
+ while (!txQueues.empty())
+ {
+ txBatches.emplace_back();
+ auto& curBatch = txBatches.back();
+ // go over all users that still have transactions
+ for (auto it = txQueues.begin(); it != txQueues.end();)
{
- phase.emplace_back(TXSET_COMP_TXS_MAYBE_DISCOUNTED_FEE);
- auto& discountedFeeComponent = phase.back().txsMaybeDiscountedFee();
- if (fee)
+ auto& txQueue = *it;
+ curBatch.emplace_back(txQueue->getTopTx());
+ txQueue->popTopTx();
+ if (txQueue->empty())
{
- discountedFeeComponent.baseFee.activate() = *fee;
+ // done with that user
+ it = txQueues.erase(it);
+ }
+ else
+ {
+ ++it;
}
- componentPerBid[fee] = &discountedFeeComponent.txs;
- componentPerBid[fee]->reserve(txCount);
}
- auto sortedTxs = TxSetUtils::sortTxsInHashOrder(txPhase);
- for (auto const& tx : sortedTxs)
+ }
+
+ for (auto& batch : txBatches)
+ {
+ // randomize each batch using the hash of the transaction set
+ // as a way to randomize even more
+ ApplyTxSorter s(txSetHash);
+ std::sort(batch.begin(), batch.end(), s);
+ for (auto const& tx : batch)
{
- componentPerBid[feeMap.find(tx)->second]->push_back(
- tx->getEnvelope());
+ retList.push_back(tx);
}
}
+
+ return retList;
}
-// This assumes that the phase validation has already been done,
-// specifically that there are no transactions that belong to the same
-// source account, and that the ledger sequence corresponds to the
-bool
-phaseTxsAreValid(TxSetTransactions const& phase, Application& app,
- uint64_t lowerBoundCloseTimeOffset,
- uint64_t upperBoundCloseTimeOffset)
+TxStageFrameList
+sortedForApplyParallel(TxStageFrameList const& stages, Hash const& txSetHash)
{
ZoneScoped;
- // This is done so minSeqLedgerGap is validated against the next
- // ledgerSeq, which is what will be used at apply time
-
- // Grab read-only latest ledger state; This is only used to validate tx sets
- // for LCL+1
- LedgerSnapshot ls(app);
- ls.getLedgerHeader().currentToModify().ledgerSeq =
- app.getLedgerManager().getLastClosedLedgerNum() + 1;
- for (auto const& tx : phase)
+ TxStageFrameList sortedStages = stages;
+ ApplyTxSorter sorter(txSetHash);
+ for (auto& stage : sortedStages)
{
- auto txResult = tx->checkValid(app, ls, 0, lowerBoundCloseTimeOffset,
- upperBoundCloseTimeOffset);
- if (!txResult->isSuccess())
+ for (auto& thread : stage)
{
+ std::sort(thread.begin(), thread.end(), sorter);
+ }
+ // There is no need to shuffle threads in the stage, as they are
+ // independent, so the apply order doesn't matter even if the threads
+ // are being applied sequentially.
+ }
+ std::sort(sortedStages.begin(), sortedStages.end(),
+ [&sorter](auto const& a, auto const& b) {
+ releaseAssert(!a.empty() && !b.empty());
+ releaseAssert(!a.front().empty() && !b.front().empty());
+ return sorter(a.front().front(), b.front().front());
+ });
+ return stages;
+}
- CLOG_DEBUG(
- Herder, "Got bad txSet: tx invalid tx: {} result: {}",
- xdrToCerealString(tx->getEnvelope(), "TransactionEnvelope"),
- txResult->getResultCode());
+bool
+addWireTxsToList(Hash const& networkID,
+ xdr::xvector const& xdrTxs,
+ TxFrameList& txList)
+{
+ auto prevSize = txList.size();
+ txList.reserve(prevSize + xdrTxs.size());
+ for (auto const& env : xdrTxs)
+ {
+ auto tx = TransactionFrameBase::makeTransactionFromWire(networkID, env);
+ if (!tx->XDRProvidesValidFee())
+ {
return false;
}
+ txList.push_back(tx);
+ }
+ if (!std::is_sorted(txList.begin() + prevSize, txList.end(),
+ &TxSetUtils::hashTxSorter))
+ {
+ return false;
}
return true;
}
-} // namespace
-TxSetXDRFrame::TxSetXDRFrame(TransactionSet const& xdrTxSet)
- : mXDRTxSet(xdrTxSet)
- , mEncodedSize(xdr::xdr_argpack_size(xdrTxSet))
- , mHash(computeNonGenericTxSetContentsHash(xdrTxSet))
+std::vector
+computeLaneBaseFee(TxSetPhase phase, LedgerHeader const& ledgerHeader,
+ SurgePricingLaneConfig const& surgePricingConfig,
+ std::vector const& lowestLaneFee,
+ std::vector const& hadTxNotFittingLane)
{
+ std::vector laneBaseFee(lowestLaneFee.size(),
+ ledgerHeader.baseFee);
+ auto minBaseFee =
+ *std::min_element(lowestLaneFee.begin(), lowestLaneFee.end());
+ for (size_t lane = 0; lane < laneBaseFee.size(); ++lane)
+ {
+ // If generic lane is full, then any transaction had to compete with not
+ // included transactions and independently of the lane they need to have
+ // at least the minimum fee in the tx set applied.
+ if (hadTxNotFittingLane[SurgePricingPriorityQueue::GENERIC_LANE])
+ {
+ laneBaseFee[lane] = minBaseFee;
+ }
+ // If limited lane is full, then the transactions in this lane also had
+ // to compete with each other and have a base fee associated with this
+ // lane only.
+ if (lane != SurgePricingPriorityQueue::GENERIC_LANE &&
+ hadTxNotFittingLane[lane])
+ {
+ laneBaseFee[lane] = lowestLaneFee[lane];
+ }
+ if (laneBaseFee[lane] > ledgerHeader.baseFee)
+ {
+ CLOG_WARNING(
+ Herder,
+ "{} phase: surge pricing for '{}' lane is in effect with base "
+ "fee={}, baseFee={}",
+ getTxSetPhaseName(phase),
+ lane == SurgePricingPriorityQueue::GENERIC_LANE ? "generic"
+ : "DEX",
+ laneBaseFee[lane], ledgerHeader.baseFee);
+ }
+ }
+ return laneBaseFee;
}
-TxSetXDRFrame::TxSetXDRFrame(GeneralizedTransactionSet const& xdrTxSet)
- : mXDRTxSet(xdrTxSet)
- , mEncodedSize(xdr::xdr_argpack_size(xdrTxSet))
- , mHash(xdrSha256(xdrTxSet))
+std::shared_ptr
+createSurgePricingLangeConfig(TxSetPhase phase, Application& app)
{
-}
+ std::shared_ptr surgePricingLaneConfig;
+ if (phase == TxSetPhase::CLASSIC)
+ {
+ auto maxOps =
+ Resource({static_cast(
+ app.getLedgerManager().getLastMaxTxSetSizeOps()),
+ MAX_CLASSIC_BYTE_ALLOWANCE});
+ std::optional dexOpsLimit;
+ if (app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET)
+ {
+ // DEX operations limit implies that DEX transactions should
+ // compete with each other in in a separate fee lane, which
+ // is only possible with generalized tx set.
+ dexOpsLimit =
+ Resource({*app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET,
+ MAX_CLASSIC_BYTE_ALLOWANCE});
+ }
-TxSetXDRFrameConstPtr
-TxSetXDRFrame::makeFromWire(TransactionSet const& xdrTxSet)
-{
- ZoneScoped;
- std::shared_ptr txSet(new TxSetXDRFrame(xdrTxSet));
- return txSet;
+ surgePricingLaneConfig =
+ std::make_shared(maxOps, dexOpsLimit);
+ }
+ else
+ {
+ releaseAssert(phase == TxSetPhase::SOROBAN);
+
+ auto limits = app.getLedgerManager().maxLedgerResources(
+ /* isSoroban */ true);
+ // When building Soroban tx sets with parallel execution support,
+ // instructions are accounted for by the build logic, not by the surge
+ // pricing config, so we need to relax the instruction limit in surge
+ // pricing logic.
+ if (protocolVersionStartsFrom(app.getLedgerManager()
+ .getLastClosedLedgerHeader()
+ .header.ledgerVersion,
+ PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION))
+ {
+ limits.setVal(Resource::Type::INSTRUCTIONS,
+ std::numeric_limits::max());
+ }
+
+ auto byteLimit =
+ std::min(static_cast(MAX_SOROBAN_BYTE_ALLOWANCE),
+ limits.getVal(Resource::Type::TX_BYTE_SIZE));
+ limits.setVal(Resource::Type::TX_BYTE_SIZE, byteLimit);
+
+ surgePricingLaneConfig =
+ std::make_shared(limits);
+ }
+ return surgePricingLaneConfig;
}
-TxSetXDRFrameConstPtr
-TxSetXDRFrame::makeFromWire(GeneralizedTransactionSet const& xdrTxSet)
+TxFrameList
+buildSurgePricedSequentialPhase(
+ TxFrameList const& txs,
+ std::shared_ptr surgePricingLaneConfig,
+ std::vector& hadTxNotFittingLane)
{
ZoneScoped;
- std::shared_ptr txSet(new TxSetXDRFrame(xdrTxSet));
- return txSet;
+ return SurgePricingPriorityQueue::getMostTopTxsWithinLimits(
+ txs, surgePricingLaneConfig, hadTxNotFittingLane);
}
-TxSetXDRFrameConstPtr
-TxSetXDRFrame::makeFromStoredTxSet(StoredTransactionSet const& storedSet)
+std::pair,
+ std::shared_ptr>
+applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app)
{
- if (storedSet.v() == 0)
+ ZoneScoped;
+ auto surgePricingLaneConfig = createSurgePricingLangeConfig(phase, app);
+ std::vector hadTxNotFittingLane;
+ bool isParallelSoroban =
+ phase == TxSetPhase::SOROBAN &&
+ protocolVersionStartsFrom(app.getLedgerManager()
+ .getLastClosedLedgerHeader()
+ .header.ledgerVersion,
+ PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION);
+ std::variant includedTxs;
+ if (isParallelSoroban)
{
- return TxSetXDRFrame::makeFromWire(storedSet.txSet());
+ includedTxs = buildSurgePricedParallelSorobanPhase(
+ txs, app.getConfig(),
+ app.getLedgerManager().getSorobanNetworkConfig(),
+ surgePricingLaneConfig, hadTxNotFittingLane);
}
- return TxSetXDRFrame::makeFromWire(storedSet.generalizedTxSet());
+ else
+ {
+ includedTxs = buildSurgePricedSequentialPhase(
+ txs, surgePricingLaneConfig, hadTxNotFittingLane);
+ }
+
+ auto visitIncludedTxs =
+ [&includedTxs](
+ std::function visitor) {
+ std::visit(
+ [&visitor](auto const& txs) {
+ using T = std::decay_t;
+ if constexpr (std::is_same_v)
+ {
+ for (auto const& tx : txs)
+ {
+ visitor(tx);
+ }
+ }
+ else if constexpr (std::is_same_v)
+ {
+ for (auto const& stage : txs)
+ {
+ for (auto const& thread : stage)
+ {
+ for (auto const& tx : thread)
+ {
+ visitor(tx);
+ }
+ }
+ }
+ }
+ else
+ {
+ releaseAssert(false);
+ }
+ },
+ includedTxs);
+ };
+
+ std::vector lowestLaneFee;
+ auto const& lclHeader =
+ app.getLedgerManager().getLastClosedLedgerHeader().header;
+
+ size_t laneCount = surgePricingLaneConfig->getLaneLimits().size();
+ lowestLaneFee.resize(laneCount, std::numeric_limits::max());
+ visitIncludedTxs(
+ [&lowestLaneFee, &surgePricingLaneConfig, &lclHeader](auto const& tx) {
+ size_t lane = surgePricingLaneConfig->getLane(*tx);
+ auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion);
+ lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee);
+ });
+ auto laneBaseFee =
+ computeLaneBaseFee(phase, lclHeader, *surgePricingLaneConfig,
+ lowestLaneFee, hadTxNotFittingLane);
+ auto inclusionFeeMapPtr = std::make_shared();
+ auto& inclusionFeeMap = *inclusionFeeMapPtr;
+ visitIncludedTxs([&inclusionFeeMap, &laneBaseFee,
+ &surgePricingLaneConfig](auto const& tx) {
+ inclusionFeeMap[tx] = laneBaseFee[surgePricingLaneConfig->getLane(*tx)];
+ });
+
+ return std::make_pair(includedTxs, inclusionFeeMapPtr);
}
-std::pair
-makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
- Application& app, uint64_t lowerBoundCloseTimeOffset,
- uint64_t upperBoundCloseTimeOffset
-#ifdef BUILD_TESTS
- ,
- bool skipValidation
-#endif
-)
+size_t
+countOps(TxFrameList const& txs)
{
- TxSetPhaseTransactions invalidTxs;
- invalidTxs.resize(txPhases.size());
- return makeTxSetFromTransactions(txPhases, app, lowerBoundCloseTimeOffset,
- upperBoundCloseTimeOffset, invalidTxs
-#ifdef BUILD_TESTS
- ,
- skipValidation
-#endif
- );
+ return std::accumulate(txs.begin(), txs.end(), size_t(0),
+ [&](size_t a, TransactionFrameBasePtr const& tx) {
+ return a + tx->getNumOperations();
+ });
+}
+
+int64_t
+computeBaseFeeForLegacyTxSet(LedgerHeader const& lclHeader,
+ TxFrameList const& txs)
+{
+ ZoneScoped;
+ auto ledgerVersion = lclHeader.ledgerVersion;
+ int64_t lowestBaseFee = std::numeric_limits::max();
+ for (auto const& tx : txs)
+ {
+ int64_t txBaseFee = computePerOpFee(*tx, ledgerVersion);
+ lowestBaseFee = std::min(lowestBaseFee, txBaseFee);
+ }
+ int64_t baseFee = lclHeader.baseFee;
+
+ if (protocolVersionStartsFrom(ledgerVersion, ProtocolVersion::V_11))
+ {
+ size_t surgeOpsCutoff = 0;
+ if (lclHeader.maxTxSetSize >= MAX_OPS_PER_TX)
+ {
+ surgeOpsCutoff = lclHeader.maxTxSetSize - MAX_OPS_PER_TX;
+ }
+ if (countOps(txs) > surgeOpsCutoff)
+ {
+ baseFee = lowestBaseFee;
+ }
+ }
+ return baseFee;
+}
+
+bool
+checkFeeMap(InclusionFeeMap const& feeMap, LedgerHeader const& lclHeader)
+{
+ for (auto const& [tx, fee] : feeMap)
+ {
+ if (!fee)
+ {
+ continue;
+ }
+ if (*fee < lclHeader.baseFee)
+ {
+
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: {} has too low component "
+ "base fee {}",
+ hexAbbrev(lclHeader.previousLedgerHash), *fee);
+ return false;
+ }
+ if (tx->getInclusionFee() < getMinInclusionFee(*tx, lclHeader, fee))
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: {} has tx with fee bid ({}) lower "
+ "than base fee ({})",
+ hexAbbrev(lclHeader.previousLedgerHash),
+ tx->getInclusionFee(),
+ getMinInclusionFee(*tx, lclHeader, fee));
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace
+
+TxSetXDRFrame::TxSetXDRFrame(TransactionSet const& xdrTxSet)
+ : mXDRTxSet(xdrTxSet)
+ , mEncodedSize(xdr::xdr_argpack_size(xdrTxSet))
+ , mHash(computeNonGeneralizedTxSetContentsHash(xdrTxSet))
+{
+}
+
+TxSetXDRFrame::TxSetXDRFrame(GeneralizedTransactionSet const& xdrTxSet)
+ : mXDRTxSet(xdrTxSet)
+ , mEncodedSize(xdr::xdr_argpack_size(xdrTxSet))
+ , mHash(xdrSha256(xdrTxSet))
+{
+}
+
+TxSetXDRFrameConstPtr
+TxSetXDRFrame::makeFromWire(TransactionSet const& xdrTxSet)
+{
+ ZoneScoped;
+ std::shared_ptr txSet(new TxSetXDRFrame(xdrTxSet));
+ return txSet;
+}
+
+TxSetXDRFrameConstPtr
+TxSetXDRFrame::makeFromWire(GeneralizedTransactionSet const& xdrTxSet)
+{
+ ZoneScoped;
+ std::shared_ptr txSet(new TxSetXDRFrame(xdrTxSet));
+ return txSet;
+}
+
+TxSetXDRFrameConstPtr
+TxSetXDRFrame::makeFromStoredTxSet(StoredTransactionSet const& storedSet)
+{
+ if (storedSet.v() == 0)
+ {
+ return TxSetXDRFrame::makeFromWire(storedSet.txSet());
+ }
+ return TxSetXDRFrame::makeFromWire(storedSet.generalizedTxSet());
+}
+
+std::pair
+makeTxSetFromTransactions(PerPhaseTransactionList const& txPhases,
+ Application& app, uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset
+#ifdef BUILD_TESTS
+ ,
+ bool skipValidation
+#endif
+)
+{
+ PerPhaseTransactionList invalidTxs;
+ invalidTxs.resize(txPhases.size());
+ return makeTxSetFromTransactions(txPhases, app, lowerBoundCloseTimeOffset,
+ upperBoundCloseTimeOffset, invalidTxs
+#ifdef BUILD_TESTS
+ ,
+ skipValidation
+#endif
+ );
}
std::pair
-makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
+makeTxSetFromTransactions(PerPhaseTransactionList const& txPhases,
Application& app, uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
- TxSetPhaseTransactions& invalidTxs
+ PerPhaseTransactionList& invalidTxs
#ifdef BUILD_TESTS
,
bool skipValidation
@@ -335,12 +787,12 @@ makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
releaseAssert(txPhases.size() <=
static_cast(TxSetPhase::PHASE_COUNT));
- TxSetPhaseTransactions validatedPhases;
- for (int i = 0; i < txPhases.size(); ++i)
+ std::vector validatedPhases;
+ for (size_t i = 0; i < txPhases.size(); ++i)
{
- auto& txs = txPhases[i];
+ auto const& phaseTxs = txPhases[i];
bool expectSoroban = static_cast(i) == TxSetPhase::SOROBAN;
- if (!std::all_of(txs.begin(), txs.end(), [&](auto const& tx) {
+ if (!std::all_of(phaseTxs.begin(), phaseTxs.end(), [&](auto const& tx) {
return tx->isSoroban() == expectSoroban;
}))
{
@@ -349,20 +801,44 @@ makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
}
auto& invalid = invalidTxs[i];
+ TxFrameList validatedTxs;
#ifdef BUILD_TESTS
if (skipValidation)
{
- validatedPhases.emplace_back(txs);
+ validatedTxs = phaseTxs;
}
else
{
#endif
- validatedPhases.emplace_back(
- TxSetUtils::trimInvalid(txs, app, lowerBoundCloseTimeOffset,
- upperBoundCloseTimeOffset, invalid));
+ validatedTxs = TxSetUtils::trimInvalid(
+ phaseTxs, app, lowerBoundCloseTimeOffset,
+ upperBoundCloseTimeOffset, invalid);
#ifdef BUILD_TESTS
}
#endif
+ auto phaseType = static_cast(i);
+ auto [includedTxs, inclusionFeeMap] =
+ applySurgePricing(phaseType, validatedTxs, app);
+
+ std::visit(
+ [&validatedPhases, phaseType, inclusionFeeMap](auto&& txs) {
+ using T = std::decay_t;
+ if constexpr (std::is_same_v)
+ {
+ validatedPhases.emplace_back(
+ TxSetPhaseFrame(phaseType, txs, inclusionFeeMap));
+ }
+ else if constexpr (std::is_same_v)
+ {
+ validatedPhases.emplace_back(TxSetPhaseFrame(
+ phaseType, std::move(txs), inclusionFeeMap));
+ }
+ else
+ {
+ releaseAssert(false);
+ }
+ },
+ includedTxs);
}
auto const& lclHeader = app.getLedgerManager().getLastClosedLedgerHeader();
@@ -371,7 +847,7 @@ makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
std::unique_ptr preliminaryApplicableTxSet(
new ApplicableTxSetFrame(app, lclHeader, validatedPhases,
std::nullopt));
- preliminaryApplicableTxSet->applySurgePricing(app);
+
// Do the roundtrip through XDR to ensure we never build an incorrect tx set
// for nomination.
auto outputTxSet = preliminaryApplicableTxSet->toWireTxSetFrame();
@@ -402,7 +878,7 @@ makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
outputApplicableTxSet->numPhases();
if (valid)
{
- for (int i = 0; i < preliminaryApplicableTxSet->numPhases(); ++i)
+ for (size_t i = 0; i < preliminaryApplicableTxSet->numPhases(); ++i)
{
valid = valid && preliminaryApplicableTxSet->sizeTx(
static_cast(i)) ==
@@ -416,6 +892,8 @@ makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
upperBoundCloseTimeOffset);
if (!valid)
{
+ outputApplicableTxSet->checkValid(app, lowerBoundCloseTimeOffset,
+ upperBoundCloseTimeOffset);
throw std::runtime_error("Created invalid tx set frame");
}
@@ -428,14 +906,19 @@ TxSetXDRFrame::makeEmpty(LedgerHeaderHistoryEntry const& lclHeader)
if (protocolVersionStartsFrom(lclHeader.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
- TxSetPhaseTransactions emptyPhases(
- static_cast(TxSetPhase::PHASE_COUNT));
- std::vector>>
- emptyFeeMap(static_cast(TxSetPhase::PHASE_COUNT));
+ bool isParallelSoroban = false;
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ isParallelSoroban =
+ protocolVersionStartsFrom(lclHeader.header.ledgerVersion,
+ PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION);
+#endif
+ std::vector emptyPhases = {
+ TxSetPhaseFrame::makeEmpty(TxSetPhase::CLASSIC, false),
+ TxSetPhaseFrame::makeEmpty(TxSetPhase::SOROBAN, isParallelSoroban)};
+
GeneralizedTransactionSet txSet;
- transactionsToGeneralizedTransactionSetXDR(emptyPhases, emptyFeeMap,
- lclHeader.hash, txSet);
+ transactionsToGeneralizedTransactionSetXDR(emptyPhases, lclHeader.hash,
+ txSet);
return TxSetXDRFrame::makeFromWire(txSet);
}
TransactionSet txSet;
@@ -445,7 +928,7 @@ TxSetXDRFrame::makeEmpty(LedgerHeaderHistoryEntry const& lclHeader)
TxSetXDRFrameConstPtr
TxSetXDRFrame::makeFromHistoryTransactions(Hash const& previousLedgerHash,
- TxSetTransactions const& txs)
+ TxFrameList const& txs)
{
TransactionSet txSet;
transactionsToTransactionSetXDR(txs, previousLedgerHash, txSet);
@@ -454,49 +937,58 @@ TxSetXDRFrame::makeFromHistoryTransactions(Hash const& previousLedgerHash,
#ifdef BUILD_TESTS
std::pair
-makeTxSetFromTransactions(TxSetTransactions txs, Application& app,
+makeTxSetFromTransactions(TxFrameList txs, Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
bool enforceTxsApplyOrder)
{
- TxSetTransactions invalid;
+ TxFrameList invalid;
return makeTxSetFromTransactions(txs, app, lowerBoundCloseTimeOffset,
upperBoundCloseTimeOffset, invalid,
enforceTxsApplyOrder);
}
std::pair
-makeTxSetFromTransactions(TxSetTransactions txs, Application& app,
+makeTxSetFromTransactions(TxFrameList txs, Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
- TxSetTransactions& invalidTxs,
- bool enforceTxsApplyOrder)
+ TxFrameList& invalidTxs, bool enforceTxsApplyOrder)
{
auto lclHeader = app.getLedgerManager().getLastClosedLedgerHeader();
- TxSetPhaseTransactions phases;
- phases.resize(protocolVersionStartsFrom(lclHeader.header.ledgerVersion,
- SOROBAN_PROTOCOL_VERSION)
- ? 2
- : 1);
+ PerPhaseTransactionList perPhaseTxs;
+ perPhaseTxs.resize(protocolVersionStartsFrom(lclHeader.header.ledgerVersion,
+ SOROBAN_PROTOCOL_VERSION)
+ ? 2
+ : 1);
for (auto& tx : txs)
{
if (tx->isSoroban())
{
- phases[static_cast(TxSetPhase::SOROBAN)].push_back(tx);
+ perPhaseTxs[static_cast(TxSetPhase::SOROBAN)].push_back(tx);
}
else
{
- phases[static_cast(TxSetPhase::CLASSIC)].push_back(tx);
+ perPhaseTxs[static_cast(TxSetPhase::CLASSIC)].push_back(tx);
}
}
- TxSetPhaseTransactions invalid;
- invalid.resize(phases.size());
- auto res = makeTxSetFromTransactions(phases, app, lowerBoundCloseTimeOffset,
- upperBoundCloseTimeOffset, invalid,
- enforceTxsApplyOrder);
+ PerPhaseTransactionList invalid;
+ invalid.resize(perPhaseTxs.size());
+ auto res = makeTxSetFromTransactions(
+ perPhaseTxs, app, lowerBoundCloseTimeOffset, upperBoundCloseTimeOffset,
+ invalid, enforceTxsApplyOrder);
if (enforceTxsApplyOrder)
{
- res.second->mApplyOrderOverride = txs;
+ auto const& resPhases = res.second->getPhases();
+ // This only supports sequential tx sets for now.
+ std::vector overridePhases;
+ for (size_t i = 0; i < resPhases.size(); ++i)
+ {
+ overridePhases.emplace_back(TxSetPhaseFrame(
+ static_cast(i), std::move(perPhaseTxs[i]),
+ std::make_shared(
+ resPhases[i].getInclusionFeeMap())));
+ }
+ res.second->mApplyOrderPhases = overridePhases;
res.first->mApplicableTxSetOverride = std::move(res.second);
}
invalidTxs = invalid[0];
@@ -533,7 +1025,7 @@ TxSetXDRFrame::prepareForApply(Application& app) const
}
#endif
ZoneScoped;
- std::unique_ptr txSet{};
+ std::vector phaseFrames;
if (isGeneralizedTxSet())
{
auto const& xdrTxSet = std::get(mXDRTxSet);
@@ -543,62 +1035,34 @@ TxSetXDRFrame::prepareForApply(Application& app) const
"Got bad generalized txSet with invalid XDR structure");
return nullptr;
}
- auto const& phases = xdrTxSet.v1TxSet().phases;
- TxSetPhaseTransactions defaultPhases;
- defaultPhases.resize(phases.size());
+ auto const& xdrPhases = xdrTxSet.v1TxSet().phases;
- txSet = std::unique_ptr(new ApplicableTxSetFrame(
- app, true, previousLedgerHash(), defaultPhases, mHash));
-
- releaseAssert(phases.size() <=
- static_cast(TxSetPhase::PHASE_COUNT));
- for (auto phaseId = 0; phaseId < phases.size(); phaseId++)
+ for (size_t phaseId = 0; phaseId < xdrPhases.size(); ++phaseId)
{
- auto const& phase = phases[phaseId];
- auto const& components = phase.v0Components();
- for (auto const& component : components)
+ auto maybePhase = TxSetPhaseFrame::makeFromWire(
+ static_cast(phaseId), app.getNetworkID(),
+ xdrPhases[phaseId]);
+ if (!maybePhase)
{
- switch (component.type())
- {
- case TXSET_COMP_TXS_MAYBE_DISCOUNTED_FEE:
- std::optional baseFee;
- if (component.txsMaybeDiscountedFee().baseFee)
- {
- baseFee = *component.txsMaybeDiscountedFee().baseFee;
- }
- if (!txSet->addTxsFromXdr(
- app.getNetworkID(),
- component.txsMaybeDiscountedFee().txs, true,
- baseFee, static_cast(phaseId)))
- {
- CLOG_DEBUG(Herder,
- "Got bad generalized txSet: transactions "
- "are not ordered correctly or contain "
- "invalid phase transactions");
- return nullptr;
- }
- break;
- }
+ return nullptr;
}
+ phaseFrames.emplace_back(std::move(*maybePhase));
}
}
else
{
auto const& xdrTxSet = std::get(mXDRTxSet);
- txSet = std::unique_ptr(new ApplicableTxSetFrame(
- app, false, previousLedgerHash(), {TxSetTransactions{}}, mHash));
- if (!txSet->addTxsFromXdr(app.getNetworkID(), xdrTxSet.txs, false,
- std::nullopt, TxSetPhase::CLASSIC))
+ auto maybePhase = TxSetPhaseFrame::makeFromWireLegacy(
+ app.getLedgerManager().getLastClosedLedgerHeader().header,
+ app.getNetworkID(), xdrTxSet.txs);
+ if (!maybePhase)
{
- CLOG_DEBUG(Herder,
- "Got bad txSet: transactions are not ordered correctly "
- "or contain invalid phase transactions");
return nullptr;
}
- txSet->computeTxFeesForNonGeneralizedSet(
- app.getLedgerManager().getLastClosedLedgerHeader().header);
+ phaseFrames.emplace_back(std::move(*maybePhase));
}
- return txSet;
+ return std::unique_ptr(new ApplicableTxSetFrame(
+ app, isGeneralizedTxSet(), previousLedgerHash(), phaseFrames, mHash));
}
bool
@@ -635,9 +1099,28 @@ TxSetXDRFrame::sizeTxTotal() const
size_t totalSize = 0;
for (auto const& phase : txSet.phases)
{
- for (auto const& component : phase.v0Components())
+ switch (phase.v())
{
- totalSize += component.txsMaybeDiscountedFee().txs.size();
+ case 0:
+ for (auto const& component : phase.v0Components())
+ {
+ totalSize += component.txsMaybeDiscountedFee().txs.size();
+ }
+ break;
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ case 1:
+ for (auto const& stage :
+ phase.parallelTxsComponent().executionStages)
+ {
+ for (auto const& thread : stage)
+ {
+ totalSize += thread.size();
+ }
+ }
+ break;
+#endif
+ default:
+ break;
}
}
return totalSize;
@@ -676,12 +1159,33 @@ TxSetXDRFrame::sizeOpTotalForLogging() const
size_t totalSize = 0;
for (auto const& phase : txSet.phases)
{
- for (auto const& component : phase.v0Components())
+ switch (phase.v())
{
- totalSize += std::accumulate(
- component.txsMaybeDiscountedFee().txs.begin(),
- component.txsMaybeDiscountedFee().txs.end(), 0ull,
- accumulateTxsFn);
+ case 0:
+ for (auto const& component : phase.v0Components())
+ {
+ totalSize += std::accumulate(
+ component.txsMaybeDiscountedFee().txs.begin(),
+ component.txsMaybeDiscountedFee().txs.end(), 0ull,
+ accumulateTxsFn);
+ }
+ break;
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ case 1:
+ for (auto const& stage :
+ phase.parallelTxsComponent().executionStages)
+ {
+ for (auto const& thread : stage)
+ {
+ totalSize +=
+ std::accumulate(thread.begin(), thread.end(), 0ull,
+ accumulateTxsFn);
+ }
+ }
+ break;
+#endif
+ default:
+ break;
}
}
return totalSize;
@@ -693,10 +1197,10 @@ TxSetXDRFrame::sizeOpTotalForLogging() const
}
}
-TxSetPhaseTransactions
+PerPhaseTransactionList
TxSetXDRFrame::createTransactionFrames(Hash const& networkID) const
{
- TxSetPhaseTransactions phaseTxs;
+ PerPhaseTransactionList phaseTxs;
if (isGeneralizedTxSet())
{
auto const& txSet =
@@ -704,14 +1208,38 @@ TxSetXDRFrame::createTransactionFrames(Hash const& networkID) const
for (auto const& phase : txSet.phases)
{
auto& txs = phaseTxs.emplace_back();
- for (auto const& component : phase.v0Components())
+ switch (phase.v())
{
- for (auto const& tx : component.txsMaybeDiscountedFee().txs)
+ case 0:
+ for (auto const& component : phase.v0Components())
{
- txs.emplace_back(
- TransactionFrameBase::makeTransactionFromWire(networkID,
- tx));
+ for (auto const& tx : component.txsMaybeDiscountedFee().txs)
+ {
+ txs.emplace_back(
+ TransactionFrameBase::makeTransactionFromWire(
+ networkID, tx));
+ }
+ }
+ break;
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ case 1:
+ for (auto const& stage :
+ phase.parallelTxsComponent().executionStages)
+ {
+ for (auto const& thread : stage)
+ {
+ for (auto const& tx : thread)
+ {
+ txs.emplace_back(
+ TransactionFrameBase::makeTransactionFromWire(
+ networkID, tx));
+ }
+ }
}
+ break;
+#endif
+ default:
+ break;
}
}
}
@@ -759,19 +1287,602 @@ TxSetXDRFrame::storeXDR(StoredTransactionSet& txSet) const
}
else
{
- txSet.v(0);
- txSet.txSet() = std::get(mXDRTxSet);
+ txSet.v(0);
+ txSet.txSet() = std::get(mXDRTxSet);
+ }
+}
+
+TxSetPhaseFrame::Iterator::Iterator(TxStageFrameList const& txs,
+ size_t stageIndex)
+ : mStages(txs), mStageIndex(stageIndex)
+{
+}
+
+TransactionFrameBasePtr
+TxSetPhaseFrame::Iterator::operator*() const
+{
+
+ if (mStageIndex >= mStages.size() ||
+ mThreadIndex >= mStages[mStageIndex].size() ||
+ mTxIndex >= mStages[mStageIndex][mThreadIndex].size())
+ {
+ throw std::runtime_error("TxPhase iterator out of bounds");
+ }
+ return mStages[mStageIndex][mThreadIndex][mTxIndex];
+}
+
+TxSetPhaseFrame::Iterator&
+TxSetPhaseFrame::Iterator::operator++()
+{
+ if (mStageIndex >= mStages.size())
+ {
+ throw std::runtime_error("TxPhase iterator out of bounds");
+ }
+ ++mTxIndex;
+ if (mTxIndex >= mStages[mStageIndex][mThreadIndex].size())
+ {
+ mTxIndex = 0;
+ ++mThreadIndex;
+ if (mThreadIndex >= mStages[mStageIndex].size())
+ {
+ mThreadIndex = 0;
+ ++mStageIndex;
+ }
+ }
+ return *this;
+}
+
+TxSetPhaseFrame::Iterator
+TxSetPhaseFrame::Iterator::operator++(int)
+{
+ auto it = *this;
+ ++(*this);
+ return it;
+}
+
+bool
+TxSetPhaseFrame::Iterator::operator==(Iterator const& other) const
+{
+ return mStageIndex == other.mStageIndex &&
+ mThreadIndex == other.mThreadIndex && mTxIndex == other.mTxIndex &&
+ // Make sure to compare the pointers, not the contents, both for
+ // correctness and optimization.
+ &mStages == &other.mStages;
+}
+
+bool
+TxSetPhaseFrame::Iterator::operator!=(Iterator const& other) const
+{
+ return !(*this == other);
+}
+
+std::optional
+TxSetPhaseFrame::makeFromWire(TxSetPhase phase, Hash const& networkID,
+ TransactionPhase const& xdrPhase)
+{
+ auto inclusionFeeMapPtr = std::make_shared();
+ auto& inclusionFeeMap = *inclusionFeeMapPtr;
+ std::optional phaseFrame;
+ switch (xdrPhase.v())
+ {
+ case 0:
+ {
+ TxFrameList txList;
+ auto const& components = xdrPhase.v0Components();
+ for (auto const& component : components)
+ {
+ switch (component.type())
+ {
+ case TXSET_COMP_TXS_MAYBE_DISCOUNTED_FEE:
+ std::optional baseFee;
+ if (component.txsMaybeDiscountedFee().baseFee)
+ {
+ baseFee = *component.txsMaybeDiscountedFee().baseFee;
+ }
+ size_t prevSize = txList.size();
+ if (!addWireTxsToList(networkID,
+ component.txsMaybeDiscountedFee().txs,
+ txList))
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad generalized txSet: transactions "
+ "are not ordered correctly or contain "
+ "invalid transactions");
+ return std::nullopt;
+ }
+ for (auto it = txList.begin() + prevSize; it != txList.end();
+ ++it)
+ {
+ inclusionFeeMap[*it] = baseFee;
+ }
+ break;
+ }
+ }
+ phaseFrame.emplace(
+ TxSetPhaseFrame(phase, std::move(txList), inclusionFeeMapPtr));
+ break;
+ }
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ case 1:
+ {
+ auto const& xdrStages = xdrPhase.parallelTxsComponent().executionStages;
+ std::optional baseFee;
+ if (xdrPhase.parallelTxsComponent().baseFee)
+ {
+ baseFee = *xdrPhase.parallelTxsComponent().baseFee;
+ }
+ TxStageFrameList stages;
+ stages.reserve(xdrStages.size());
+ for (auto const& xdrStage : xdrStages)
+ {
+ auto& stage = stages.emplace_back();
+ stage.reserve(xdrStage.size());
+ for (auto const& xdrThread : xdrStage)
+ {
+ auto& thread = stage.emplace_back();
+ thread.reserve(xdrThread.size());
+ for (auto const& env : xdrThread)
+ {
+ auto tx = TransactionFrameBase::makeTransactionFromWire(
+ networkID, env);
+ if (!tx->XDRProvidesValidFee())
+ {
+ CLOG_DEBUG(Herder, "Got bad generalized txSet: "
+ "transaction has invalid XDR");
+ return std::nullopt;
+ }
+ thread.push_back(tx);
+ inclusionFeeMap[tx] = baseFee;
+ }
+ if (!std::is_sorted(thread.begin(), thread.end(),
+ &TxSetUtils::hashTxSorter))
+ {
+ CLOG_DEBUG(Herder, "Got bad generalized txSet: "
+ "thread is not sorted");
+ return std::nullopt;
+ }
+ }
+ if (!std::is_sorted(stage.begin(), stage.end(),
+ [](auto const& a, auto const& b) {
+ releaseAssert(!a.empty() && !b.empty());
+ return TxSetUtils::hashTxSorter(a.front(),
+ b.front());
+ }))
+ {
+ CLOG_DEBUG(Herder, "Got bad generalized txSet: "
+ "stage is not sorted");
+ return std::nullopt;
+ }
+ }
+ if (!std::is_sorted(stages.begin(), stages.end(),
+ [](auto const& a, auto const& b) {
+ releaseAssert(!a.empty() && !b.empty());
+ return TxSetUtils::hashTxSorter(
+ a.front().front(), b.front().front());
+ }))
+ {
+ CLOG_DEBUG(Herder, "Got bad generalized txSet: "
+ "stages are not sorted");
+ return std::nullopt;
+ }
+ phaseFrame.emplace(
+ TxSetPhaseFrame(phase, std::move(stages), inclusionFeeMapPtr));
+ break;
+ }
+#endif
+ default:
+ releaseAssert(false);
+ }
+ releaseAssert(phaseFrame);
+ return phaseFrame;
+}
+
+std::optional
+TxSetPhaseFrame::makeFromWireLegacy(
+ LedgerHeader const& lclHeader, Hash const& networkID,
+ xdr::xvector const& xdrTxs)
+{
+ TxFrameList txList;
+ if (!addWireTxsToList(networkID, xdrTxs, txList))
+ {
+ CLOG_DEBUG(
+ Herder,
+ "Got bad legacy txSet: transactions are not ordered correctly "
+ "or contain invalid phase transactions");
+ return std::nullopt;
+ }
+ auto inclusionFeeMapPtr = std::make_shared();
+ auto& inclusionFeeMap = *inclusionFeeMapPtr;
+ int64_t baseFee = computeBaseFeeForLegacyTxSet(lclHeader, txList);
+ for (auto const& tx : txList)
+ {
+ inclusionFeeMap[tx] = baseFee;
+ }
+ return TxSetPhaseFrame(TxSetPhase::CLASSIC, std::move(txList),
+ inclusionFeeMapPtr);
+}
+
+TxSetPhaseFrame
+TxSetPhaseFrame::makeEmpty(TxSetPhase phase, bool isParallel)
+{
+ if (isParallel)
+ {
+ return TxSetPhaseFrame(phase, TxStageFrameList{},
+ std::make_shared());
+ }
+ return TxSetPhaseFrame(phase, TxFrameList{},
+ std::make_shared());
+}
+
+TxSetPhaseFrame::TxSetPhaseFrame(
+ TxSetPhase phase, TxFrameList const& txs,
+ std::shared_ptr inclusionFeeMap)
+ : mPhase(phase), mInclusionFeeMap(inclusionFeeMap), mIsParallel(false)
+{
+ if (!txs.empty())
+ {
+ mStages.emplace_back().push_back(txs);
+ }
+}
+
+TxSetPhaseFrame::TxSetPhaseFrame(
+ TxSetPhase phase, TxStageFrameList&& txs,
+ std::shared_ptr inclusionFeeMap)
+ : mPhase(phase)
+ , mStages(txs)
+ , mInclusionFeeMap(inclusionFeeMap)
+ , mIsParallel(true)
+{
+}
+
+TxSetPhaseFrame::Iterator
+TxSetPhaseFrame::begin() const
+{
+ return TxSetPhaseFrame::Iterator(mStages, 0);
+}
+
+TxSetPhaseFrame::Iterator
+TxSetPhaseFrame::end() const
+{
+ return TxSetPhaseFrame::Iterator(mStages, mStages.size());
+}
+
+size_t
+TxSetPhaseFrame::sizeTx() const
+{
+ ZoneScoped;
+ return std::distance(this->begin(), this->end());
+}
+
+size_t
+TxSetPhaseFrame::sizeOp() const
+{
+ ZoneScoped;
+ return std::accumulate(this->begin(), this->end(), size_t(0),
+ [&](size_t a, TransactionFrameBasePtr const& tx) {
+ return a + tx->getNumOperations();
+ });
+}
+
+size_t
+TxSetPhaseFrame::size(LedgerHeader const& lclHeader) const
+{
+ switch (mPhase)
+ {
+ case TxSetPhase::CLASSIC:
+ return protocolVersionStartsFrom(lclHeader.ledgerVersion,
+ ProtocolVersion::V_11)
+ ? sizeOp()
+ : sizeTx();
+ case TxSetPhase::SOROBAN:
+ return sizeOp();
+ }
+}
+
+bool
+TxSetPhaseFrame::empty() const
+{
+ return sizeTx() == 0;
+}
+
+bool
+TxSetPhaseFrame::isParallel() const
+{
+ return mIsParallel;
+}
+
+TxStageFrameList const&
+TxSetPhaseFrame::getParallelStages() const
+{
+ releaseAssert(isParallel());
+ return mStages;
+}
+
+TxFrameList const&
+TxSetPhaseFrame::getSequentialTxs() const
+{
+ releaseAssert(!isParallel());
+ static TxFrameList empty;
+ if (mStages.empty())
+ {
+ return empty;
+ }
+ return mStages.at(0).at(0);
+}
+
+void
+TxSetPhaseFrame::toXDR(TransactionPhase& xdrPhase) const
+{
+
+ if (isParallel())
+ {
+
+#ifdef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION
+ parallelPhaseToXdr(mStages, *mInclusionFeeMap, xdrPhase);
+#else
+ releaseAssert(false);
+#endif
+ }
+ else
+ {
+ sequentialPhaseToXdr(getSequentialTxs(), *mInclusionFeeMap, xdrPhase);
+ }
+}
+
+InclusionFeeMap const&
+TxSetPhaseFrame::getInclusionFeeMap() const
+{
+ return *mInclusionFeeMap;
+}
+
+TxSetPhaseFrame
+TxSetPhaseFrame::sortedForApply(Hash const& txSetHash) const
+{
+ if (isParallel())
+ {
+ return TxSetPhaseFrame(mPhase,
+ sortedForApplyParallel(mStages, txSetHash),
+ mInclusionFeeMap);
+ }
+ else
+ {
+ return TxSetPhaseFrame(
+ mPhase, sortedForApplySequential(getSequentialTxs(), txSetHash),
+ mInclusionFeeMap);
+ }
+}
+
+bool
+TxSetPhaseFrame::checkValid(Application& app,
+ uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset) const
+{
+ auto const& lcl = app.getLedgerManager().getLastClosedLedgerHeader();
+ bool isSoroban = mPhase == TxSetPhase::SOROBAN;
+ bool checkPhaseSpecific =
+ isSoroban
+ ? checkValidSoroban(
+ lcl.header, app.getLedgerManager().getSorobanNetworkConfig())
+ : checkValidClassic(lcl.header);
+ if (!checkPhaseSpecific)
+ {
+ return false;
+ }
+
+ for (auto const& tx : *this)
+ {
+ if (tx->isSoroban() != isSoroban)
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad generalized txSet with invalid "
+ "phase {} transactions",
+ static_cast(mPhase));
+ return false;
+ }
+ }
+
+ if (!checkFeeMap(getInclusionFeeMap(), lcl.header))
+ {
+ return false;
+ }
+
+ return txsAreValid(app, lowerBoundCloseTimeOffset,
+ upperBoundCloseTimeOffset);
+}
+
+bool
+TxSetPhaseFrame::checkValidClassic(LedgerHeader const& lclHeader) const
+{
+ if (isParallel())
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: classic phase can't be parallel");
+ return false;
+ }
+ if (this->size(lclHeader) > lclHeader.maxTxSetSize)
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: too many classic txs {} > {}",
+ this->size(lclHeader), lclHeader.maxTxSetSize);
+ return false;
+ }
+ return true;
+}
+
+bool
+TxSetPhaseFrame::checkValidSoroban(
+ LedgerHeader const& lclHeader,
+ SorobanNetworkConfig const& sorobanConfig) const
+{
+ bool needParallelSorobanPhase = protocolVersionStartsFrom(
+ lclHeader.ledgerVersion, PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION);
+ if (isParallel() != needParallelSorobanPhase)
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: Soroban phase parallel support "
+ "does not match the current protocol; '{}' was "
+ "expected",
+ needParallelSorobanPhase);
+ return false;
+ }
+
+ if (!isParallel())
+ {
+ return true;
+ }
+ auto const& stages = getParallelStages();
+
+ // Verify that number of threads is not exceeded per stage. There is no
+ // limit for the number of stages or transactions per thread.
+ for (auto const& stage : stages)
+ {
+ if (stage.size() > sorobanConfig.ledgerMaxParallelThreads())
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: too many threads in Soroban "
+ "stage {} > {}",
+ stage.size(), sorobanConfig.ledgerMaxParallelThreads());
+ return false;
+ }
+ }
+
+ // Verify that 'sequential' instructions don't exceed the ledger-wide
+ // limit.
+ // Every may have multiple thread and its runtime is considered to be
+ // bounded by the slowest thread (i.e. the one with the most instructions).
+ // Stages are meant to be executed sequentially, so the ledger-wide
+ // instructions should be limited by the sum of the stages' instructions.
+ int64_t totalInstructions = 0;
+ for (auto const& stage : stages)
+ {
+ int64_t stageInstructions = 0;
+ for (auto const& thread : stage)
+ {
+ int64_t threadInstructions = 0;
+ for (auto const& tx : thread)
+ {
+ // threadInstructions + tx->sorobanResources().instructions >
+ // std::numeric_limits::max()
+ if (threadInstructions >
+ std::numeric_limits::max() -
+ tx->sorobanResources().instructions)
+ {
+ CLOG_DEBUG(Herder, "Got bad txSet: Soroban per-thread "
+ "instructions overflow");
+ return false;
+ }
+ threadInstructions += tx->sorobanResources().instructions;
+ }
+ stageInstructions = std::max(stageInstructions, threadInstructions);
+ }
+ // totalInstructions + stageInstructions >
+ // std::numeric_limits::max()
+ if (totalInstructions >
+ std::numeric_limits::max() - stageInstructions)
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: Soroban total instructions overflow");
+ return false;
+ }
+ totalInstructions += stageInstructions;
+ }
+ if (totalInstructions > sorobanConfig.ledgerMaxInstructions())
+ {
+ CLOG_DEBUG(
+ Herder,
+ "Got bad txSet: Soroban total instructions exceed limit: {} > {}",
+ totalInstructions, sorobanConfig.ledgerMaxInstructions());
+ return false;
+ }
+
+ // Verify that there are no read-write conflicts between threads within
+ // every stage.
+ for (auto const& stage : stages)
+ {
+ UnorderedSet stageReadOnlyKeys;
+ UnorderedSet stageReadWriteKeys;
+ for (auto const& thread : stage)
+ {
+ std::vector threadReadOnlyKeys;
+ std::vector threadReadWriteKeys;
+ for (auto const& tx : thread)
+ {
+ auto const& footprint = tx->sorobanResources().footprint;
+
+ for (auto const& key : footprint.readOnly)
+ {
+ if (stageReadWriteKeys.count(key) > 0)
+ {
+ CLOG_DEBUG(
+ Herder,
+ "Got bad generalized txSet: thread footprint "
+ "conflicts with another thread within stage");
+ return false;
+ }
+ threadReadOnlyKeys.push_back(key);
+ }
+ for (auto const& key : footprint.readWrite)
+ {
+ if (stageReadOnlyKeys.count(key) > 0 ||
+ stageReadWriteKeys.count(key) > 0)
+ {
+ CLOG_DEBUG(
+ Herder,
+ "Got bad generalized txSet: thread footprint "
+ "conflicts with another thread within stage");
+ return false;
+ }
+ threadReadWriteKeys.push_back(key);
+ }
+ }
+ stageReadOnlyKeys.insert(threadReadOnlyKeys.begin(),
+ threadReadOnlyKeys.end());
+ stageReadWriteKeys.insert(threadReadWriteKeys.begin(),
+ threadReadWriteKeys.end());
+ }
+ }
+ return true;
+}
+
+// This assumes that the phase validation has already been done,
+// specifically that there are no transactions that belong to the same
+// source account.
+bool
+TxSetPhaseFrame::txsAreValid(Application& app,
+ uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset) const
+{
+ ZoneScoped;
+ // This is done so minSeqLedgerGap is validated against the next
+ // ledgerSeq, which is what will be used at apply time
+
+ // Grab read-only latest ledger state; This is only used to validate tx sets
+ // for LCL+1
+ LedgerSnapshot ls(app);
+ ls.getLedgerHeader().currentToModify().ledgerSeq =
+ app.getLedgerManager().getLastClosedLedgerNum() + 1;
+ for (auto const& tx : *this)
+ {
+ auto txResult = tx->checkValid(app, ls, 0, lowerBoundCloseTimeOffset,
+ upperBoundCloseTimeOffset);
+ if (!txResult->isSuccess())
+ {
+
+ CLOG_DEBUG(
+ Herder, "Got bad txSet: tx invalid tx: {} result: {}",
+ xdrToCerealString(tx->getEnvelope(), "TransactionEnvelope"),
+ txResult->getResultCode());
+ return false;
+ }
}
+ return true;
}
-ApplicableTxSetFrame::ApplicableTxSetFrame(Application& app, bool isGeneralized,
- Hash const& previousLedgerHash,
- TxSetPhaseTransactions const& txs,
- std::optional contentsHash)
+ApplicableTxSetFrame::ApplicableTxSetFrame(
+ Application& app, bool isGeneralized, Hash const& previousLedgerHash,
+ std::vector const& phases,
+ std::optional contentsHash)
: mIsGeneralized(isGeneralized)
, mPreviousLedgerHash(previousLedgerHash)
- , mTxPhases(txs)
- , mPhaseInclusionFeeMap(mTxPhases.size())
+ , mPhases(phases)
, mContentsHash(contentsHash)
{
releaseAssert(previousLedgerHash ==
@@ -780,12 +1891,13 @@ ApplicableTxSetFrame::ApplicableTxSetFrame(Application& app, bool isGeneralized,
ApplicableTxSetFrame::ApplicableTxSetFrame(
Application& app, LedgerHeaderHistoryEntry const& lclHeader,
- TxSetPhaseTransactions const& txs, std::optional contentsHash)
+ std::vector const& phases,
+ std::optional contentsHash)
: ApplicableTxSetFrame(
app,
protocolVersionStartsFrom(lclHeader.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION),
- lclHeader.hash, txs, contentsHash)
+ lclHeader.hash, phases, contentsHash)
{
}
@@ -796,73 +1908,33 @@ ApplicableTxSetFrame::getContentsHash() const
return *mContentsHash;
}
-TxSetTransactions const&
-ApplicableTxSetFrame::getTxsForPhase(TxSetPhase phase) const
+TxSetPhaseFrame const&
+ApplicableTxSetFrame::getPhase(TxSetPhase phaseTxs) const
{
- releaseAssert(static_cast(phase) < mTxPhases.size());
- return mTxPhases.at(static_cast(phase));
+ releaseAssert(static_cast(phaseTxs) < mPhases.size());
+ return mPhases.at(static_cast(phaseTxs));
}
-TxSetTransactions
-ApplicableTxSetFrame::getTxsInApplyOrder() const
+std::vector const&
+ApplicableTxSetFrame::getPhases() const
{
-#ifdef BUILD_TESTS
- if (mApplyOrderOverride)
- {
- return *mApplyOrderOverride;
- }
-#endif
- ZoneScoped;
-
- // Use a single vector to order transactions from all phases
- std::vector retList;
- retList.reserve(sizeTxTotal());
+ return mPhases;
+}
- for (auto const& phase : mTxPhases)
+std::vector const&
+ApplicableTxSetFrame::getPhasesInApplyOrder() const
+{
+ ZoneScoped;
+ if (mApplyOrderPhases.empty())
{
- auto txQueues = TxSetUtils::buildAccountTxQueues(phase);
-
- // build txBatches
- // txBatches i-th element contains each i-th transaction for accounts
- // with a transaction in the transaction set
- std::vector> txBatches;
-
- while (!txQueues.empty())
- {
- txBatches.emplace_back();
- auto& curBatch = txBatches.back();
- // go over all users that still have transactions
- for (auto it = txQueues.begin(); it != txQueues.end();)
- {
- auto& txQueue = *it;
- curBatch.emplace_back(txQueue->getTopTx());
- txQueue->popTopTx();
- if (txQueue->empty())
- {
- // done with that user
- it = txQueues.erase(it);
- }
- else
- {
- ++it;
- }
- }
- }
-
- for (auto& batch : txBatches)
+ mApplyOrderPhases.reserve(mPhases.size());
+ for (auto const& phaseTxs : mPhases)
{
- // randomize each batch using the hash of the transaction set
- // as a way to randomize even more
- ApplyTxSorter s(getContentsHash());
- std::sort(batch.begin(), batch.end(), s);
- for (auto const& tx : batch)
- {
- retList.push_back(tx);
- }
+ mApplyOrderPhases.emplace_back(
+ phaseTxs.sortedForApply(getContentsHash()));
}
}
-
- return retList;
+ return mApplyOrderPhases;
}
// need to make sure every account that is submitting a tx has enough to pay
@@ -897,53 +1969,15 @@ ApplicableTxSetFrame::checkValid(Application& app,
if (isGeneralizedTxSet())
{
- auto checkFeeMap = [&](auto const& feeMap) {
- for (auto const& [tx, fee] : feeMap)
- {
- if (!fee)
- {
- continue;
- }
- if (*fee < lcl.header.baseFee)
- {
-
- CLOG_DEBUG(
- Herder,
- "Got bad txSet: {} has too low component base fee {}",
- hexAbbrev(mPreviousLedgerHash), *fee);
- return false;
- }
- if (tx->getInclusionFee() <
- getMinInclusionFee(*tx, lcl.header, fee))
- {
- CLOG_DEBUG(
- Herder,
- "Got bad txSet: {} has tx with fee bid ({}) lower "
- "than base fee ({})",
- hexAbbrev(mPreviousLedgerHash), tx->getInclusionFee(),
- getMinInclusionFee(*tx, lcl.header, fee));
- return false;
- }
- }
- return true;
- };
-
- if (!checkFeeMap(getInclusionFeeMap(TxSetPhase::CLASSIC)))
- {
- return false;
- }
- if (!checkFeeMap(getInclusionFeeMap(TxSetPhase::SOROBAN)))
- {
- return false;
- }
+ // Generalized transaction sets should always have 2 phases by
+ // construction.
+ releaseAssert(mPhases.size() ==
+ static_cast(TxSetPhase::PHASE_COUNT));
}
-
- if (this->size(lcl.header, TxSetPhase::CLASSIC) > lcl.header.maxTxSetSize)
+ else
{
- CLOG_DEBUG(Herder, "Got bad txSet: too many classic txs {} > {}",
- this->size(lcl.header, TxSetPhase::CLASSIC),
- lcl.header.maxTxSetSize);
- return false;
+ // Legacy tx sets should have 1 phase by construction.
+ releaseAssert(mPhases.size() == 1);
}
if (needGeneralizedTxSet)
@@ -951,7 +1985,7 @@ ApplicableTxSetFrame::checkValid(Application& app,
// First, ensure the tx set does not contain multiple txs per source
// account
std::unordered_set seenAccounts;
- for (auto const& phase : mTxPhases)
+ for (auto const& phase : mPhases)
{
for (auto const& tx : phase)
{
@@ -974,54 +2008,54 @@ ApplicableTxSetFrame::checkValid(Application& app,
return false;
}
+ auto limits = app.getLedgerManager().maxLedgerResources(
+ /* isSoroban */ true);
+ // When building Soroban tx sets with parallel execution support,
+ // instructions are accounted for by the build logic, not by the
+ // surge pricing config, so we need to relax the instruction limit
+ // in surge pricing logic.
+ if (protocolVersionStartsFrom(app.getLedgerManager()
+ .getLastClosedLedgerHeader()
+ .header.ledgerVersion,
+ PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION))
{
- auto limits = app.getLedgerManager().maxLedgerResources(
- /* isSoroban */ true);
- if (anyGreater(*totalTxSetRes, limits))
- {
- CLOG_DEBUG(Herder,
- "Got bad txSet: needed resources exceed ledger "
- "limits {} > {}",
- totalTxSetRes->toString(), limits.toString());
- return false;
- }
+ limits.setVal(Resource::Type::INSTRUCTIONS,
+ std::numeric_limits::max());
+ }
+ if (anyGreater(*totalTxSetRes, limits))
+ {
+ CLOG_DEBUG(Herder,
+ "Got bad txSet: needed resources exceed ledger "
+ "limits {} > {}",
+ totalTxSetRes->toString(), limits.toString());
+ return false;
}
}
-
- bool allValid = true;
- for (auto const& txs : mTxPhases)
+ for (auto const& phase : mPhases)
{
- if (!phaseTxsAreValid(txs, app, lowerBoundCloseTimeOffset,
+ if (!phase.checkValid(app, lowerBoundCloseTimeOffset,
upperBoundCloseTimeOffset))
{
- allValid = false;
- break;
+ return false;
}
}
- return allValid;
+ return true;
}
size_t
ApplicableTxSetFrame::size(LedgerHeader const& lh,
- std::optional phase) const
+ std::optional phaseType) const
{
- size_t sz = 0;
- if (!phase)
- {
- if (numPhases() > static_cast(TxSetPhase::SOROBAN))
- {
- sz += sizeOp(TxSetPhase::SOROBAN);
- }
- }
- else if (phase.value() == TxSetPhase::SOROBAN)
+ ZoneScoped;
+ if (phaseType)
{
- sz += sizeOp(TxSetPhase::SOROBAN);
+ return mPhases.at(static_cast(*phaseType)).size(lh);
}
- if (!phase || phase.value() == TxSetPhase::CLASSIC)
+
+ size_t sz = 0;
+ for (auto const& phase : mPhases)
{
- sz += protocolVersionStartsFrom(lh.ledgerVersion, ProtocolVersion::V_11)
- ? sizeOp(TxSetPhase::CLASSIC)
- : sizeTx(TxSetPhase::CLASSIC);
+ sz += phase.size(lh);
}
return sz;
}
@@ -1029,12 +2063,7 @@ ApplicableTxSetFrame::size(LedgerHeader const& lh,
size_t
ApplicableTxSetFrame::sizeOp(TxSetPhase phase) const
{
- ZoneScoped;
- auto const& txs = mTxPhases.at(static_cast(phase));
- return std::accumulate(txs.begin(), txs.end(), size_t(0),
- [&](size_t a, TransactionFrameBasePtr const& tx) {
- return a + tx->getNumOperations();
- });
+ return mPhases.at(static_cast(phase)).sizeOp();
}
size_t
@@ -1042,150 +2071,51 @@ ApplicableTxSetFrame::sizeOpTotal() const
{
ZoneScoped;
size_t total = 0;
- for (int i = 0; i < mTxPhases.size(); i++)
+ for (auto const& phase : mPhases)
{
- total += sizeOp(static_cast(i));
+ total += phase.sizeOp();
}
return total;
}
size_t
-ApplicableTxSetFrame::sizeTxTotal() const
-{
- ZoneScoped;
- size_t total = 0;
- for (int i = 0; i < mTxPhases.size(); i++)
- {
- total += sizeTx(static_cast(i));
- }
- return total;
-}
-
-void
-ApplicableTxSetFrame::computeTxFeesForNonGeneralizedSet(
- LedgerHeader const& lclHeader)
+ApplicableTxSetFrame::sizeTx(TxSetPhase phase) const
{
- ZoneScoped;
- auto ledgerVersion = lclHeader.ledgerVersion;
- int64_t lowBaseFee = std::numeric_limits::max();
- releaseAssert(mTxPhases.size() == 1);
- for (auto const& txPtr : mTxPhases[0])
- {
- int64_t txBaseFee = computePerOpFee(*txPtr, ledgerVersion);
- lowBaseFee = std::min(lowBaseFee, txBaseFee);
- }
- computeTxFeesForNonGeneralizedSet(lclHeader, lowBaseFee,
- /* enableLogging */ false);
+ return mPhases.at(static_cast(phase)).sizeTx();
}
-void
-ApplicableTxSetFrame::computeTxFeesForNonGeneralizedSet(
- LedgerHeader const& lclHeader, int64_t lowestBaseFee, bool enableLogging)
+size_t
+ApplicableTxSetFrame::sizeTxTotal() const
{
ZoneScoped;
- int64_t baseFee = lclHeader.baseFee;
-
- if (protocolVersionStartsFrom(lclHeader.ledgerVersion,
- ProtocolVersion::V_11))
- {
- size_t surgeOpsCutoff = 0;
- if (lclHeader.maxTxSetSize >= MAX_OPS_PER_TX)
- {
- surgeOpsCutoff = lclHeader.maxTxSetSize - MAX_OPS_PER_TX;
- }
- if (sizeOp(TxSetPhase::CLASSIC) > surgeOpsCutoff)
- {
- baseFee = lowestBaseFee;
- if (enableLogging)
- {
- CLOG_WARNING(Herder, "surge pricing in effect! {} > {}",
- sizeOp(TxSetPhase::CLASSIC), surgeOpsCutoff);
- }
- }
- }
-
- releaseAssert(mTxPhases.size() == 1);
- releaseAssert(mPhaseInclusionFeeMap.size() == 1);
- auto const& phase = mTxPhases[static_cast(TxSetPhase::CLASSIC)];
- auto& feeMap = getInclusionFeeMapMut(TxSetPhase::CLASSIC);
- for (auto const& tx : phase)
- {
- feeMap[tx] = baseFee;
- }
-}
-
-void
-ApplicableTxSetFrame::computeTxFees(
- TxSetPhase phase, LedgerHeader const& ledgerHeader,
- SurgePricingLaneConfig const& surgePricingConfig,
- std::vector const& lowestLaneFee,
- std::vector const& hadTxNotFittingLane)
-{
- releaseAssert(isGeneralizedTxSet());
- releaseAssert(lowestLaneFee.size() == hadTxNotFittingLane.size());
- std::vector laneBaseFee(lowestLaneFee.size(),
- ledgerHeader.baseFee);
- auto minBaseFee =
- *std::min_element(lowestLaneFee.begin(), lowestLaneFee.end());
- for (size_t lane = 0; lane < laneBaseFee.size(); ++lane)
- {
- // If generic lane is full, then any transaction had to compete with not
- // included transactions and independently of the lane they need to have
- // at least the minimum fee in the tx set applied.
- if (hadTxNotFittingLane[SurgePricingPriorityQueue::GENERIC_LANE])
- {
- laneBaseFee[lane] = minBaseFee;
- }
- // If limited lane is full, then the transactions in this lane also had
- // to compete with each other and have a base fee associated with this
- // lane only.
- if (lane != SurgePricingPriorityQueue::GENERIC_LANE &&
- hadTxNotFittingLane[lane])
- {
- laneBaseFee[lane] = lowestLaneFee[lane];
- }
- if (laneBaseFee[lane] > ledgerHeader.baseFee)
- {
- CLOG_WARNING(
- Herder,
- "{} phase: surge pricing for '{}' lane is in effect with base "
- "fee={}, baseFee={}",
- getTxSetPhaseName(phase),
- lane == SurgePricingPriorityQueue::GENERIC_LANE ? "generic"
- : "DEX",
- laneBaseFee[lane], ledgerHeader.baseFee);
- }
- }
-
- auto const& txs = mTxPhases.at(static_cast(phase));
- auto& feeMap = getInclusionFeeMapMut(phase);
- for (auto const& tx : txs)
+ size_t total = 0;
+ for (auto const& phase : mPhases)
{
- feeMap[tx] = laneBaseFee[surgePricingConfig.getLane(*tx)];
+ total += phase.sizeTx();
}
+ return total;
}
std::optional
-ApplicableTxSetFrame::getTxBaseFee(TransactionFrameBaseConstPtr const& tx,
- LedgerHeader const& lclHeader) const
+ApplicableTxSetFrame::getTxBaseFee(TransactionFrameBaseConstPtr const& tx) const
{
- for (auto const& phaseMap : mPhaseInclusionFeeMap)
+ for (auto const& phase : mPhases)
{
+ auto const& phaseMap = phase.getInclusionFeeMap();
if (auto it = phaseMap.find(tx); it != phaseMap.end())
{
return it->second;
}
}
throw std::runtime_error("Transaction not found in tx set");
- return std::nullopt;
}
std::optional
ApplicableTxSetFrame::getTxSetSorobanResource() const
{
- releaseAssert(mTxPhases.size() > static_cast(TxSetPhase::SOROBAN));
+ releaseAssert(mPhases.size() > static_cast(TxSetPhase::SOROBAN));
auto total = Resource::makeEmptySoroban();
- for (auto const& tx : mTxPhases[static_cast(TxSetPhase::SOROBAN)])
+ for (auto const& tx : mPhases[static_cast(TxSetPhase::SOROBAN)])
{
if (total.canAdd(tx->getResources(/* useByteLimitInClassic */ false)))
{
@@ -1204,16 +2134,13 @@ ApplicableTxSetFrame::getTotalFees(LedgerHeader const& lh) const
{
ZoneScoped;
int64_t total{0};
- std::for_each(mTxPhases.begin(), mTxPhases.end(),
- [&](TxSetTransactions const& phase) {
- total += std::accumulate(
- phase.begin(), phase.end(), int64_t(0),
- [&](int64_t t, TransactionFrameBasePtr const& tx) {
- return t +
- tx->getFee(lh, getTxBaseFee(tx, lh), true);
- });
- });
-
+ for (auto const& phaseTxs : mPhases)
+ {
+ for (auto const& tx : phaseTxs)
+ {
+ total += tx->getFee(lh, getTxBaseFee(tx), true);
+ }
+ }
return total;
}
@@ -1222,15 +2149,13 @@ ApplicableTxSetFrame::getTotalInclusionFees() const
{
ZoneScoped;
int64_t total{0};
- std::for_each(mTxPhases.begin(), mTxPhases.end(),
- [&](TxSetTransactions const& phase) {
- total += std::accumulate(
- phase.begin(), phase.end(), int64_t(0),
- [&](int64_t t, TransactionFrameBasePtr const& tx) {
- return t + tx->getInclusionFee();
- });
- });
-
+ for (auto const& phaseTxs : mPhases)
+ {
+ for (auto const& tx : phaseTxs)
+ {
+ total += tx->getInclusionFee();
+ }
+ }
return total;
}
@@ -1247,7 +2172,8 @@ ApplicableTxSetFrame::summary() const
FMT_STRING("txs:{}, ops:{}, base_fee:{}"), sizeTxTotal(),
sizeOpTotal(),
// NB: fee map can't be empty at this stage (checked above).
- getInclusionFeeMap(TxSetPhase::CLASSIC)
+ mPhases[static_cast(TxSetPhase::CLASSIC)]
+ .getInclusionFeeMap()
.begin()
->second.value_or(0));
}
@@ -1286,18 +2212,17 @@ ApplicableTxSetFrame::summary() const
};
std::string status;
- releaseAssert(mTxPhases.size() <=
+ releaseAssert(mPhases.size() <=
static_cast(TxSetPhase::PHASE_COUNT));
- for (auto i = 0; i != mTxPhases.size(); i++)
+ for (size_t i = 0; i < mPhases.size(); i++)
{
if (!status.empty())
{
status += ", ";
}
- status += fmt::format(
- FMT_STRING("{} phase: {}"),
- getTxSetPhaseName(static_cast(i)),
- feeStats(getInclusionFeeMap(static_cast(i))));
+ status += fmt::format(FMT_STRING("{} phase: {}"),
+ getTxSetPhaseName(static_cast(i)),
+ feeStats(mPhases[i].getInclusionFeeMap()));
}
return status;
}
@@ -1307,8 +2232,9 @@ ApplicableTxSetFrame::toXDR(TransactionSet& txSet) const
{
ZoneScoped;
releaseAssert(!isGeneralizedTxSet());
- releaseAssert(mTxPhases.size() == 1);
- transactionsToTransactionSetXDR(mTxPhases[0], mPreviousLedgerHash, txSet);
+ releaseAssert(mPhases.size() == 1);
+ transactionsToTransactionSetXDR(mPhases[0].getSequentialTxs(),
+ mPreviousLedgerHash, txSet);
}
void
@@ -1316,10 +2242,9 @@ ApplicableTxSetFrame::toXDR(GeneralizedTransactionSet& generalizedTxSet) const
{
ZoneScoped;
releaseAssert(isGeneralizedTxSet());
- releaseAssert(mTxPhases.size() <=
+ releaseAssert(mPhases.size() <=
static_cast(TxSetPhase::PHASE_COUNT));
- transactionsToGeneralizedTransactionSetXDR(mTxPhases, mPhaseInclusionFeeMap,
- mPreviousLedgerHash,
+ transactionsToGeneralizedTransactionSetXDR(mPhases, mPreviousLedgerHash,
generalizedTxSet);
}
@@ -1348,168 +2273,4 @@ ApplicableTxSetFrame::isGeneralizedTxSet() const
return mIsGeneralized;
}
-bool
-ApplicableTxSetFrame::addTxsFromXdr(
- Hash const& networkID, xdr::xvector const& txs,
- bool useBaseFee, std::optional baseFee, TxSetPhase phase)
-{
- auto& phaseTxs = mTxPhases.at(static_cast(phase));
- size_t oldSize = phaseTxs.size();
- phaseTxs.reserve(oldSize + txs.size());
- auto& inclusionFeeMap = getInclusionFeeMapMut(phase);
- for (auto const& env : txs)
- {
- auto tx = TransactionFrameBase::makeTransactionFromWire(networkID, env);
- if (!tx->XDRProvidesValidFee())
- {
- return false;
- }
- // Phase should be consistent with the tx we're trying to add
- if ((tx->isSoroban() && phase != TxSetPhase::SOROBAN) ||
- (!tx->isSoroban() && phase != TxSetPhase::CLASSIC))
- {
- return false;
- }
-
- phaseTxs.push_back(tx);
- if (useBaseFee)
- {
- inclusionFeeMap[tx] = baseFee;
- }
- }
- return std::is_sorted(phaseTxs.begin() + oldSize, phaseTxs.end(),
- &TxSetUtils::hashTxSorter);
-}
-
-void
-ApplicableTxSetFrame::applySurgePricing(Application& app)
-{
- ZoneScoped;
- releaseAssert(mTxPhases.size() <=
- static_cast(TxSetPhase::PHASE_COUNT));
- auto const& lclHeader =
- app.getLedgerManager().getLastClosedLedgerHeader().header;
- for (int i = 0; i < mTxPhases.size(); i++)
- {
- TxSetPhase phaseType = static_cast(i);
- auto& phase = mTxPhases[i];
- if (phaseType == TxSetPhase::CLASSIC)
- {
- auto maxOps =
- Resource({static_cast(
- app.getLedgerManager().getLastMaxTxSetSizeOps()),
- MAX_CLASSIC_BYTE_ALLOWANCE});
- std::optional dexOpsLimit;
- if (isGeneralizedTxSet() &&
- app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET)
- {
- // DEX operations limit implies that DEX transactions should
- // compete with each other in in a separate fee lane, which is
- // only possible with generalized tx set.
- dexOpsLimit =
- Resource({*app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET,
- MAX_CLASSIC_BYTE_ALLOWANCE});
- }
-
- auto surgePricingLaneConfig =
- std::make_shared(maxOps, dexOpsLimit);
-
- std::vector hadTxNotFittingLane;
-
- auto includedTxs =
- SurgePricingPriorityQueue::getMostTopTxsWithinLimits(
- phase, surgePricingLaneConfig, hadTxNotFittingLane);
-
- size_t laneCount = surgePricingLaneConfig->getLaneLimits().size();
- std::vector lowestLaneFee(
- laneCount, std::numeric_limits::max());
- for (auto const& tx : includedTxs)
- {
- size_t lane = surgePricingLaneConfig->getLane(*tx);
- auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion);
- lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee);
- }
-
- phase = includedTxs;
- if (isGeneralizedTxSet())
- {
- computeTxFees(TxSetPhase::CLASSIC, lclHeader,
- *surgePricingLaneConfig, lowestLaneFee,
- hadTxNotFittingLane);
- }
- else
- {
- computeTxFeesForNonGeneralizedSet(
- lclHeader,
- lowestLaneFee[SurgePricingPriorityQueue::GENERIC_LANE],
- /* enableLogging */ true);
- }
- }
- else
- {
- releaseAssert(isGeneralizedTxSet());
- releaseAssert(phaseType == TxSetPhase::SOROBAN);
-
- auto limits = app.getLedgerManager().maxLedgerResources(
- /* isSoroban */ true);
-
- auto byteLimit =
- std::min(static_cast(MAX_SOROBAN_BYTE_ALLOWANCE),
- limits.getVal(Resource::Type::TX_BYTE_SIZE));
- limits.setVal(Resource::Type::TX_BYTE_SIZE, byteLimit);
-
- auto surgePricingLaneConfig =
- std::make_shared(limits);
-
- std::vector hadTxNotFittingLane;
- auto includedTxs =
- SurgePricingPriorityQueue::getMostTopTxsWithinLimits(
- phase, surgePricingLaneConfig, hadTxNotFittingLane);
-
- size_t laneCount = surgePricingLaneConfig->getLaneLimits().size();
- std::vector lowestLaneFee(
- laneCount, std::numeric_limits::max());
- for (auto const& tx : includedTxs)
- {
- size_t lane = surgePricingLaneConfig->getLane(*tx);
- auto perOpFee = computePerOpFee(*tx, lclHeader.ledgerVersion);
- lowestLaneFee[lane] = std::min(lowestLaneFee[lane], perOpFee);
- }
-
- phase = includedTxs;
- computeTxFees(phaseType, lclHeader, *surgePricingLaneConfig,
- lowestLaneFee, hadTxNotFittingLane);
- }
- }
-}
-
-std::unordered_map> const&
-ApplicableTxSetFrame::getInclusionFeeMap(TxSetPhase phase) const
-{
- size_t phaseId = static_cast(phase);
- releaseAssert(phaseId < mPhaseInclusionFeeMap.size());
- return mPhaseInclusionFeeMap[phaseId];
-}
-
-std::unordered_map>&
-ApplicableTxSetFrame::getInclusionFeeMapMut(TxSetPhase phase)
-{
- size_t phaseId = static_cast(phase);
- releaseAssert(phaseId < mPhaseInclusionFeeMap.size());
- return mPhaseInclusionFeeMap[phaseId];
-}
-
-std::string
-getTxSetPhaseName(TxSetPhase phase)
-{
- switch (phase)
- {
- case TxSetPhase::CLASSIC:
- return "classic";
- case TxSetPhase::SOROBAN:
- return "soroban";
- default:
- throw std::runtime_error("Unknown phase");
- }
-}
} // namespace stellar
diff --git a/src/herder/TxSetFrame.h b/src/herder/TxSetFrame.h
index e8942046c9..b480bc0150 100644
--- a/src/herder/TxSetFrame.h
+++ b/src/herder/TxSetFrame.h
@@ -9,6 +9,7 @@
#include "overlay/StellarXDR.h"
#include "transactions/TransactionFrame.h"
#include "util/NonCopyable.h"
+#include "util/ProtocolVersion.h"
#include "xdr/Stellar-internal.h"
#include
@@ -33,10 +34,8 @@ enum class TxSetPhase
PHASE_COUNT
};
-using TxSetTransactions = std::vector;
-using TxSetPhaseTransactions = std::vector;
-
-std::string getTxSetPhaseName(TxSetPhase phase);
+using TxFrameList = std::vector;
+using PerPhaseTransactionList = std::vector;
// Creates a valid ApplicableTxSetFrame and corresponding TxSetXDRFrame
// from the provided transactions.
@@ -51,26 +50,26 @@ std::string getTxSetPhaseName(TxSetPhase phase);
// transaction pointers.
std::pair
makeTxSetFromTransactions(
- TxSetPhaseTransactions const& txPhases, Application& app,
+ PerPhaseTransactionList const& txPhases, Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset
#ifdef BUILD_TESTS
// Skips the tx set validation and preserves the pointers
// to the passed-in transactions - use in conjunction with
- // `orderOverride` argument in test-only overrides.
+ // `enforceTxsApplyOrder` argument in test-only overrides.
,
bool skipValidation = false
#endif
);
std::pair
makeTxSetFromTransactions(
- TxSetPhaseTransactions const& txPhases, Application& app,
+ PerPhaseTransactionList const& txPhases, Application& app,
uint64_t lowerBoundCloseTimeOffset, uint64_t upperBoundCloseTimeOffset,
- TxSetPhaseTransactions& invalidTxsPerPhase
+ PerPhaseTransactionList& invalidTxsPerPhase
#ifdef BUILD_TESTS
// Skips the tx set validation and preserves the pointers
// to the passed-in transactions - use in conjunction with
- // `orderOverride` argument in test-only overrides.
+ // `enforceTxsApplyOrder` argument in test-only overrides.
,
bool skipValidation = false
#endif
@@ -78,15 +77,15 @@ makeTxSetFromTransactions(
#ifdef BUILD_TESTS
std::pair
-makeTxSetFromTransactions(TxSetTransactions txs, Application& app,
+makeTxSetFromTransactions(TxFrameList txs, Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
bool enforceTxsApplyOrder = false);
std::pair
-makeTxSetFromTransactions(TxSetTransactions txs, Application& app,
+makeTxSetFromTransactions(TxFrameList txs, Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
- TxSetTransactions& invalidTxs,
+ TxFrameList& invalidTxs,
bool enforceTxsApplyOrder = false);
#endif
@@ -99,8 +98,7 @@ makeTxSetFromTransactions(TxSetTransactions txs, Application& app,
//
// Before even trying to validate and apply a TxSetXDRFrame it has
// to be interpreted and prepared for apply using the ledger state
-// this TxSetXDRFrame refers to. This is typically performed by
-// `prepareForApply` method.
+// this TxSetXDRFrame refers to. This is performed by `prepareForApply` method.
class TxSetXDRFrame : public NonMovableOrCopyable
{
public:
@@ -124,7 +122,7 @@ class TxSetXDRFrame : public NonMovableOrCopyable
// historical transactions.
static TxSetXDRFrameConstPtr
makeFromHistoryTransactions(Hash const& previousLedgerHash,
- TxSetTransactions const& txs);
+ TxFrameList const& txs);
void toXDR(TransactionSet& set) const;
void toXDR(GeneralizedTransactionSet& generalizedTxSet) const;
@@ -152,8 +150,11 @@ class TxSetXDRFrame : public NonMovableOrCopyable
// Returns the hash of this tx set.
Hash const& getContentsHash() const;
+ // Returns the hash of the previous ledger that this tx set refers to.
Hash const& previousLedgerHash() const;
+ // Returns the total number of transactions in this tx set (even if it's
+ // not structurally valid).
size_t sizeTxTotal() const;
// Gets the size of this transaction set in operations.
@@ -170,7 +171,8 @@ class TxSetXDRFrame : public NonMovableOrCopyable
// This is only necessary to serve a very specific use case of updating
// the transaction queue with wired tx sets. Otherwise, use
// getTransactionsForPhase() in `ApplicableTxSetFrame`.
- TxSetPhaseTransactions createTransactionFrames(Hash const& networkID) const;
+ PerPhaseTransactionList
+ createTransactionFrames(Hash const& networkID) const;
#ifdef BUILD_TESTS
mutable ApplicableTxSetFrameConstPtr mApplicableTxSetOverride;
@@ -187,6 +189,169 @@ class TxSetXDRFrame : public NonMovableOrCopyable
Hash mHash;
};
+// The following definitions are used to represent the 'parallel' phase of the
+// transaction set.
+//
+// The structure of this phase is as follows:
+// - The whole phase (`TxStageFrameList`) consists of several sequential
+// 'stages' (`TxStageFrame`). A stage has to be executed after every
+// transaction in the previous stage has been applied.
+// - A 'stage' (`TxStageFrame`) consists of several parallel 'threads'
+// (`TxThreadFrame`). Transactions in different 'threads' are independent of
+// each other and can be applied in parallel.
+// - A 'thread' (`TxThreadFrame`) consists of transactions that should
+// generally be applied sequentially. However, not all the transactions in
+// the thread are necessarily conflicting with each other; it is possible
+// that some, or even all transactions in the thread structure can be applied
+// in parallel with each other (depending on their footprints).
+//
+// This structure mimics the XDR structure of the `ParallelTxsComponent`.
+using TxThreadFrame = TxFrameList;
+using TxStageFrame = std::vector;
+using TxStageFrameList = std::vector;
+
+// Alias for the map from transaction to its inclusion fee as defined by the
+// transaction set.
+using InclusionFeeMap =
+ std::unordered_map>;
+
+// `TxSetPhaseFrame` represents a single phase of the `ApplicableTxSetFrame`.
+//
+// Phases can only be created as a part of the `ApplicableTxSetFrame` and thus
+// don't have any public constructors.
+//
+// Phase may either wrap the corresponding `TransactionPhase` XDR for
+// generalized transactions sets, or represent all the transactions in the
+// 'legacy' transaction set (which is considered to have only a single phase).
+//
+// This does not assume any specific order of transactions by default - the
+// phase in 'apply' order has to be explicitly requested from the parent
+// `ApplicableTxSetFrame` via `getPhasesInApplyOrder` method.
+class TxSetPhaseFrame
+{
+ public:
+ // Returns true when this phase can be applied in parallel.
+ // Currently only Soroban phase can be parallel, and only starting from
+ // PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION protocol
+ bool isParallel() const;
+
+ // Returns the parallel stages of this phase.
+ //
+ // This may only be called when `isParallel()` is true.
+ TxStageFrameList const& getParallelStages() const;
+ // Returns all the transactions in this phase if it's not parallel.
+ //
+ // This may only be called when `isParallel()` is false.
+ TxFrameList const& getSequentialTxs() const;
+
+ // Serializes this phase to the provided XDR.
+ void toXDR(TransactionPhase& xdrPhase) const;
+
+ // Iterator over all transactions in this phase.
+ // The order of iteration is defined by the parent `ApplicableTxSetFrame`.
+ // If the phase is sorted for apply, then the iteration order can be used
+ // to determine a stable index of every transaction in the phase, even if
+ // the phase is parallel and can have certain transaction applied in
+ // arbitrary order.
+ class Iterator
+ {
+ public:
+ using value_type = TransactionFrameBasePtr;
+ using difference_type = std::ptrdiff_t;
+ using pointer = value_type*;
+ using reference = value_type&;
+ using iterator_category = std::forward_iterator_tag;
+
+ TransactionFrameBasePtr operator*() const;
+
+ Iterator& operator++();
+ Iterator operator++(int);
+
+ bool operator==(Iterator const& other) const;
+ bool operator!=(Iterator const& other) const;
+
+ private:
+ friend class TxSetPhaseFrame;
+
+ Iterator(TxStageFrameList const& txs, size_t stageIndex);
+ TxStageFrameList const& mStages;
+ size_t mStageIndex = 0;
+ size_t mThreadIndex = 0;
+ size_t mTxIndex = 0;
+ };
+ Iterator begin() const;
+ Iterator end() const;
+ size_t sizeTx() const;
+ size_t sizeOp() const;
+ size_t size(LedgerHeader const& lclHeader) const;
+ bool empty() const;
+
+ // Get _inclusion_ fee map for this phase. The map contains lowest base
+ // fee for each transaction (lowest base fee is identical for all
+ // transactions in the same lane)
+ InclusionFeeMap const& getInclusionFeeMap() const;
+
+ private:
+ friend class TxSetXDRFrame;
+ friend class ApplicableTxSetFrame;
+
+ friend std::pair
+ makeTxSetFromTransactions(PerPhaseTransactionList const& txPhases,
+ Application& app,
+ uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset,
+ PerPhaseTransactionList& invalidTxsPerPhase
+#ifdef BUILD_TESTS
+ ,
+ bool skipValidation
+#endif
+ );
+#ifdef BUILD_TESTS
+ friend std::pair
+ makeTxSetFromTransactions(TxFrameList txs, Application& app,
+ uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset,
+ TxFrameList& invalidTxs,
+ bool enforceTxsApplyOrder);
+#endif
+ TxSetPhaseFrame(TxSetPhase phase, TxFrameList const& txs,
+ std::shared_ptr inclusionFeeMap);
+ TxSetPhaseFrame(TxSetPhase phase, TxStageFrameList&& txs,
+ std::shared_ptr inclusionFeeMap);
+
+ // Creates a new phase from `TransactionPhase` XDR coming from a
+ // `GeneralizedTransactionSet`.
+ static std::optional
+ makeFromWire(TxSetPhase phase, Hash const& networkID,
+ TransactionPhase const& xdrPhase);
+
+ // Creates a new phase from all the transactions in the legacy
+ // `TransactionSet` XDR.
+ static std::optional
+ makeFromWireLegacy(LedgerHeader const& lclHeader, Hash const& networkID,
+ xdr::xvector const& xdrTxs);
+
+ // Creates a valid empty phase with given `isParallel` flag.
+ static TxSetPhaseFrame makeEmpty(TxSetPhase phase, bool isParallel);
+
+ // Returns a copy of this phase with transactions sorted for apply.
+ TxSetPhaseFrame sortedForApply(Hash const& txSetHash) const;
+ bool checkValid(Application& app, uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset) const;
+ bool checkValidClassic(LedgerHeader const& lclHeader) const;
+ bool checkValidSoroban(LedgerHeader const& lclHeader,
+ SorobanNetworkConfig const& sorobanConfig) const;
+
+ bool txsAreValid(Application& app, uint64_t lowerBoundCloseTimeOffset,
+ uint64_t upperBoundCloseTimeOffset) const;
+
+ TxSetPhase mPhase;
+
+ TxStageFrameList mStages;
+ std::shared_ptr mInclusionFeeMap;
+ bool mIsParallel;
+};
+
// Transaction set that is suitable for being applied to the ledger.
//
// This is not necessarily a fully *valid* transaction set: further validation
@@ -201,51 +366,64 @@ class ApplicableTxSetFrame
public:
// Returns the base fee for the transaction or std::nullopt when the
// transaction is not discounted.
- std::optional getTxBaseFee(TransactionFrameBaseConstPtr const& tx,
- LedgerHeader const& lclHeader) const;
+ std::optional
+ getTxBaseFee(TransactionFrameBaseConstPtr const& tx) const;
+
+ // Gets the phase frame for the given phase in arbitrary order.
+ TxSetPhaseFrame const& getPhase(TxSetPhase phase) const;
- // Gets all the transactions belonging to this frame in arbitrary order.
- TxSetTransactions const& getTxsForPhase(TxSetPhase phase) const;
+ // Gets all the phases of this transaction set with transactions in
+ // arbitrary order.
+ std::vector const& getPhases() const;
- // Build a list of transaction ready to be applied to the last closed
- // ledger, based on the transaction set.
+ // Gets all the phases of this transaction set, each phase with
+ // transactions sorted for apply.
//
- // The order satisfies:
- // * transactions for an account are sorted by sequence number (ascending)
- // * the order between accounts is randomized
- TxSetTransactions getTxsInApplyOrder() const;
+ // For the generalized transaction sets, the order is defined by shuffling
+ // all the transactions that are applied sequentially relatively to each
+ // other using the hash of the transaction set.
+ //
+ // For the legacy transaction sets, the apply order satisfies :
+ // - Transactions for an account are sorted by sequence number (ascending).
+ // - The order between accounts is randomized.
+ std::vector const& getPhasesInApplyOrder() const;
- // Checks if this tx set frame is valid in the context of the current LCL.
+ // Checks if this transaction set frame is valid in the context of the
+ // current LCL.
// This can be called when LCL does not match `previousLedgerHash`, but
// then validation will never pass.
bool checkValid(Application& app, uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset) const;
+ // Returns the size of this whole transaction set, or the specified phase
+ // in operations or transactions (for older protocol versions).
size_t size(LedgerHeader const& lh,
std::optional phase = std::nullopt) const;
- size_t
- sizeTx(TxSetPhase phase) const
- {
- return mTxPhases.at(static_cast(phase)).size();
- }
+ // Returns the total number of transactions in the given phase.
+ size_t sizeTx(TxSetPhase phase) const;
+ // Returns the total number of transactions in this tx set.
size_t sizeTxTotal() const;
+ // Returns the total number of operations in the given phase.
+ size_t sizeOp(TxSetPhase phase) const;
+ // Returns the total number of operations in this tx set.
+ size_t sizeOpTotal() const;
+
+ // Returns whether this transaction set is empty.
bool
empty() const
{
return sizeTxTotal() == 0;
}
+ // Returns the number of phases in this tx set.
size_t
numPhases() const
{
- return mTxPhases.size();
+ return mPhases.size();
}
- size_t sizeOp(TxSetPhase phase) const;
- size_t sizeOpTotal() const;
-
// Returns the sum of all fees that this transaction set would take.
int64_t getTotalFees(LedgerHeader const& lh) const;
@@ -254,29 +432,33 @@ class ApplicableTxSetFrame
int64_t getTotalInclusionFees() const;
// Returns whether this transaction set is generalized, i.e. representable
- // by GeneralizedTransactionSet XDR.
+ // by `GeneralizedTransactionSet` XDR.
bool isGeneralizedTxSet() const;
- // Returns a short description of this transaction set.
+ // Returns a short description of this transaction set for logging.
std::string summary() const;
+ // Returns the hash of this transaction set.
Hash const& getContentsHash() const;
- // This shouldn't be needed for the regular flows, but is useful
+ // Converts this transaction set to XDR.
+ // This shouldn't be exposed for the regular flows, but is useful to expose
// to cover XDR roundtrips in tests.
#ifndef BUILD_TESTS
private:
#endif
TxSetXDRFrameConstPtr toWireTxSetFrame() const;
+ std::optional getTxSetSorobanResource() const;
private:
friend class TxSetXDRFrame;
+
friend std::pair
- makeTxSetFromTransactions(TxSetPhaseTransactions const& txPhases,
+ makeTxSetFromTransactions(PerPhaseTransactionList const& txPhases,
Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
- TxSetPhaseTransactions& invalidTxsPerPhase
+ PerPhaseTransactionList& invalidTxsPerPhase
#ifdef BUILD_TESTS
,
bool skipValidation
@@ -284,68 +466,43 @@ class ApplicableTxSetFrame
);
#ifdef BUILD_TESTS
friend std::pair
- makeTxSetFromTransactions(TxSetTransactions txs, Application& app,
+ makeTxSetFromTransactions(TxFrameList txs, Application& app,
uint64_t lowerBoundCloseTimeOffset,
uint64_t upperBoundCloseTimeOffset,
- TxSetTransactions& invalidTxs,
+ TxFrameList& invalidTxs,
bool enforceTxsApplyOrder);
#endif
ApplicableTxSetFrame(Application& app,
LedgerHeaderHistoryEntry const& lclHeader,
- TxSetPhaseTransactions const& txs,
+ std::vector const& phases,
std::optional contentsHash);
ApplicableTxSetFrame(Application& app, bool isGeneralized,
Hash const& previousLedgerHash,
- TxSetPhaseTransactions const& txs,
+ std::vector const& phases,
std::optional contentsHash);
ApplicableTxSetFrame(ApplicableTxSetFrame const&) = default;
ApplicableTxSetFrame(ApplicableTxSetFrame&&) = default;
- void computeTxFeesForNonGeneralizedSet(LedgerHeader const& lclHeader);
-
- bool addTxsFromXdr(Hash const& networkID,
- xdr::xvector const& txs,
- bool useBaseFee, std::optional baseFee,
- TxSetPhase phase);
- void applySurgePricing(Application& app);
-
- void computeTxFeesForNonGeneralizedSet(LedgerHeader const& lclHeader,
- int64_t lowestBaseFee,
- bool enableLogging);
-
- void computeTxFees(TxSetPhase phase, LedgerHeader const& ledgerHeader,
- SurgePricingLaneConfig const& surgePricingConfig,
- std::vector const& lowestLaneFee,
- std::vector const& hadTxNotFittingLane);
- std::optional getTxSetSorobanResource() const;
-
- // Get _inclusion_ fee map for a given phase. The map contains lowest base
- // fee for each transaction (lowest base fee is identical for all
- // transactions in the same lane)
- std::unordered_map> const&
- getInclusionFeeMap(TxSetPhase phase) const;
-
- std::unordered_map>&
- getInclusionFeeMapMut(TxSetPhase phase);
void toXDR(TransactionSet& set) const;
void toXDR(GeneralizedTransactionSet& generalizedTxSet) const;
bool const mIsGeneralized;
Hash const mPreviousLedgerHash;
+
+ // All the phases of this transaction set.
+ //
// There can only be 1 phase (classic) prior to protocol 20.
- // Starting protocol 20, there are 2 phases (classic and soroban).
- std::vector mTxPhases;
+ // Starting with protocol 20, there are 2 phases (classic and Soroban).
+ std::vector const mPhases;
- std::vector>>
- mPhaseInclusionFeeMap;
+ // The phases with transactions sorted for apply.
+ //
+ // This is `mutable` because we want to do the sorting lazily only for the
+ // transaction sets that are actually applied.
+ mutable std::vector mApplyOrderPhases;
std::optional mContentsHash;
-#ifdef BUILD_TESTS
- mutable std::optional mApplyOrderOverride;
-#endif
};
} // namespace stellar
diff --git a/src/herder/TxSetUtils.cpp b/src/herder/TxSetUtils.cpp
index 4157cf936c..546044b59f 100644
--- a/src/herder/TxSetUtils.cpp
+++ b/src/herder/TxSetUtils.cpp
@@ -35,8 +35,8 @@ namespace
{
// Target use case is to remove a subset of invalid transactions from a TxSet.
// I.e. txSet.size() >= txsToRemove.size()
-TxSetTransactions
-removeTxs(TxSetTransactions const& txs, TxSetTransactions const& txsToRemove)
+TxFrameList
+removeTxs(TxFrameList const& txs, TxFrameList const& txsToRemove)
{
UnorderedSet txsToRemoveSet;
txsToRemoveSet.reserve(txsToRemove.size());
@@ -45,7 +45,7 @@ removeTxs(TxSetTransactions const& txs, TxSetTransactions const& txsToRemove)
std::inserter(txsToRemoveSet, txsToRemoveSet.end()),
[](TransactionFrameBasePtr const& tx) { return tx->getFullHash(); });
- TxSetTransactions newTxs;
+ TxFrameList newTxs;
newTxs.reserve(txs.size() - txsToRemove.size());
for (auto const& tx : txs)
{
@@ -105,17 +105,46 @@ TxSetUtils::hashTxSorter(TransactionFrameBasePtr const& tx1,
return tx1->getFullHash() < tx2->getFullHash();
}
-TxSetTransactions
-TxSetUtils::sortTxsInHashOrder(TxSetTransactions const& transactions)
+TxFrameList
+TxSetUtils::sortTxsInHashOrder(TxFrameList const& transactions)
{
ZoneScoped;
- TxSetTransactions sortedTxs(transactions);
+ TxFrameList sortedTxs(transactions);
std::sort(sortedTxs.begin(), sortedTxs.end(), TxSetUtils::hashTxSorter);
return sortedTxs;
}
+TxStageFrameList
+TxSetUtils::sortParallelTxsInHashOrder(TxStageFrameList const& stages)
+{
+ ZoneScoped;
+ TxStageFrameList sortedStages = stages;
+ for (auto& stage : sortedStages)
+ {
+ for (auto& thread : stage)
+ {
+ std::sort(thread.begin(), thread.end(), TxSetUtils::hashTxSorter);
+ }
+ std::sort(stage.begin(), stage.end(), [](auto const& a, auto const& b) {
+ if (a.empty() && b.empty())
+ {
+ int t = 0;
+ }
+ releaseAssert(!a.empty() && !b.empty());
+ return hashTxSorter(a.front(), b.front());
+ });
+ }
+ std::sort(sortedStages.begin(), sortedStages.end(),
+ [](auto const& a, auto const& b) {
+ releaseAssert(!a.empty() && !b.empty());
+ releaseAssert(!a.front().empty() && !b.front().empty());
+ return hashTxSorter(a.front().front(), b.front().front());
+ });
+ return sortedStages;
+}
+
std::vector>
-TxSetUtils::buildAccountTxQueues(TxSetTransactions const& txs)
+TxSetUtils::buildAccountTxQueues(TxFrameList const& txs)
{
ZoneScoped;
UnorderedMap