Skip to content

Commit

Permalink
Merge pull request #4314 from ThomasBrady/4290-refactor-apply-buckets…
Browse files Browse the repository at this point in the history
…-work

ApplyBucketWork at per-bucket level

Reviewed-by: SirTyson
  • Loading branch information
latobarita authored Jun 3, 2024
2 parents 0f189aa + ba9f43f commit 0aedf11
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 173 deletions.
236 changes: 76 additions & 160 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,6 @@ ApplyBucketsWork::startingLevel()
: BucketList::kNumLevels - 1;
}

bool
ApplyBucketsWork::appliedAllLevels() const
{
if (mApp.getConfig().isUsingBucketListDB())
{
return mLevel == BucketList::kNumLevels - 1;
}
else
{
return mLevel == 0;
}
}

uint32_t
ApplyBucketsWork::nextLevel() const
{
return mApp.getConfig().isUsingBucketListDB() ? mLevel + 1 : mLevel - 1;
}

ApplyBucketsWork::ApplyBucketsWork(
Application& app,
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
Expand All @@ -86,7 +67,6 @@ ApplyBucketsWork::ApplyBucketsWork(
, mBuckets(buckets)
, mApplyState(applyState)
, mEntryTypeFilter(onlyApply)
, mApplying(false)
, mTotalSize(0)
, mLevel(startingLevel())
, mMaxProtocolVersion(maxProtocolVersion)
Expand All @@ -103,12 +83,6 @@ ApplyBucketsWork::ApplyBucketsWork(
{
}

BucketLevel&
ApplyBucketsWork::getBucketLevel(uint32_t level)
{
return mApp.getBucketManager().getBucketList().getLevel(level);
}

std::shared_ptr<Bucket>
ApplyBucketsWork::getBucket(std::string const& hash)
{
Expand All @@ -133,9 +107,11 @@ ApplyBucketsWork::doReset()
mAppliedSize = 0;
mLastAppliedSizeMb = 0;
mLastPos = 0;
mBucketToApplyIndex = 0;
mMinProtocolVersionSeen = UINT32_MAX;
mSeenKeys.clear();
mBucketsToIndex.clear();
mBucketsToApply.clear();
mBucketApplicator.reset();

if (!isAborting())
{
Expand Down Expand Up @@ -172,17 +148,31 @@ ApplyBucketsWork::doReset()
mTotalBuckets++;
mTotalSize += bucket->getSize();
}

if (mApp.getConfig().isUsingBucketListDB())
mBucketsToApply.emplace_back(bucket);
};
// If using bucketlist DB, we iterate through the BucketList in order
// (i.e. L0 curr, L0 snap, L1 curr, etc) as we are just applying offers
// (and can keep track of all seen keys). Otherwise, we iterate in
// reverse order (i.e. L N snap, L N curr, L N-1 snap, etc.) as we are
// applying all entry types and cannot keep track of all seen keys as it
// would be too large.
if (mApp.getConfig().isUsingBucketListDB())
{
for (auto const& hsb : mApplyState.currentBuckets)
{
mBucketsToIndex.emplace_back(bucket);
addBucket(getBucket(hsb.curr));
addBucket(getBucket(hsb.snap));
}
};

for (auto const& hsb : mApplyState.currentBuckets)
}
else
{
addBucket(getBucket(hsb.snap));
addBucket(getBucket(hsb.curr));
for (auto iter = mApplyState.currentBuckets.rbegin();
iter != mApplyState.currentBuckets.rend(); ++iter)
{
auto const& hsb = *iter;
addBucket(getBucket(hsb.snap));
addBucket(getBucket(hsb.curr));
}
}
// estimate the number of ledger entries contained in those buckets
// use accounts as a rough approximator as to overestimate a bit
Expand All @@ -194,18 +184,42 @@ ApplyBucketsWork::doReset()
totalLECount);
mApp.getLedgerTxnRoot().prepareNewObjects(totalLECount);
}
}

mLevel = startingLevel();
mApplying = false;
mDelayChecked = false;
bool
ApplyBucketsWork::appliedAllBuckets() const
{
return mBucketToApplyIndex == mBucketsToApply.size();
}

mIndexBucketsWork.reset();
mAssumeStateWork.reset();
void
ApplyBucketsWork::startBucket()
{
ZoneScoped;
auto bucket = mBucketsToApply.at(mBucketToApplyIndex);
mMinProtocolVersionSeen =
std::min(mMinProtocolVersionSeen, Bucket::getBucketVersion(bucket));
// Create a new applicator for the bucket.
mBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel, bucket,
mEntryTypeFilter, mSeenKeys);
}

mFirstBucket.reset();
mSecondBucket.reset();
mFirstBucketApplicator.reset();
mSecondBucketApplicator.reset();
void
ApplyBucketsWork::prepareForNextBucket()
{
ZoneScoped;
mBucketApplicator.reset();
mApp.getCatchupManager().bucketsApplied();
mBucketToApplyIndex++;
// If mBucketToApplyIndex is even, we are progressing to the next
// level, if we are using BucketListDB, this is the next greater
// level, otherwise it's the next lower level.
if (mBucketToApplyIndex % 2 == 0)
{
mLevel =
mApp.getConfig().isUsingBucketListDB() ? mLevel + 1 : mLevel - 1;
}
}

// We iterate through the BucketList either in-order (level 0 curr, level 0
Expand All @@ -222,68 +236,6 @@ ApplyBucketsWork::doReset()
// iterate through the BucketList. Due to this, we iterate in reverse order such
// that the newest version of a key is written last, overwriting the older
// versions. This is much slower due to DB churn.
void
ApplyBucketsWork::startLevel()
{
ZoneScoped;
releaseAssert(isLevelComplete());

CLOG_DEBUG(History, "ApplyBuckets : starting level {}", mLevel);
auto& level = getBucketLevel(mLevel);
HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel);
bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();

bool applyFirst = isUsingBucketListDB
? (i.curr != binToHex(level.getCurr()->getHash()))
: (i.snap != binToHex(level.getSnap()->getHash()));
bool applySecond = isUsingBucketListDB
? (i.snap != binToHex(level.getSnap()->getHash()))
: (i.curr != binToHex(level.getCurr()->getHash()));

if (mApplying || applyFirst)
{
mFirstBucket = getBucket(isUsingBucketListDB ? i.curr : i.snap);
mMinProtocolVersionSeen = std::min(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mFirstBucket));
mFirstBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel,
mFirstBucket, mEntryTypeFilter, mSeenKeys);

if (isUsingBucketListDB)
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}",
mLevel, i.curr);
}
else
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}",
mLevel, i.snap);
}

mApplying = true;
}
if (mApplying || applySecond)
{
mSecondBucket = getBucket(isUsingBucketListDB ? i.snap : i.curr);
mMinProtocolVersionSeen = std::min(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mSecondBucket));
mSecondBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel,
mSecondBucket, mEntryTypeFilter, mSeenKeys);

if (isUsingBucketListDB)
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}",
mLevel, i.snap);
}
else
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}",
mLevel, i.curr);
}
mApplying = true;
}
}

BasicWork::State
ApplyBucketsWork::doWork()
Expand All @@ -294,10 +246,11 @@ ApplyBucketsWork::doWork()
bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();
if (isUsingBucketListDB)
{
// Step 1: index buckets.
if (!mIndexBucketsWork)
{
// Spawn indexing work for the first time
mIndexBucketsWork = addWork<IndexBucketsWork>(mBucketsToIndex);
mIndexBucketsWork = addWork<IndexBucketsWork>(mBucketsToApply);
return State::WORK_RUNNING;
}
else if (mIndexBucketsWork->getState() !=
Expand All @@ -312,6 +265,7 @@ ApplyBucketsWork::doWork()

if (!mAssumeStateWork)
{
// Step 2: apply buckets.
if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked)
{
mDelayChecked = true;
Expand All @@ -326,61 +280,29 @@ ApplyBucketsWork::doWork()
}
}

// Check if we're at the beginning of the new level
if (isLevelComplete())
{
startLevel();
}

// The structure of these if statements is motivated by the following:
// 1. mSecondBucketApplicator should never be advanced if
// mFirstBucketApplicator is not false. Otherwise it is possible for
// second bucket to modify the database when the invariants for first
// bucket are checked.
// 2. There is no reason to advance mFirstBucketApplicator or
// mSecondBucketApplicator if there is nothing to be applied.
if (mFirstBucketApplicator)
auto isCurr = isUsingBucketListDB ? mBucketToApplyIndex % 2 == 0
: mBucketToApplyIndex % 2 == 1;
if (mBucketApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);

// When BucketListDB is enabled, we apply in order starting with
// curr. If BucketListDB is not enabled, we iterate in reverse
// starting with snap.
bool isCurr = isUsingBucketListDB;
if (*mFirstBucketApplicator)
// Only advance the applicator if there are entries to apply.
if (*mBucketApplicator)
{
advance(isCurr ? "curr" : "snap", *mFirstBucketApplicator);
advance(isCurr ? "curr" : "snap", *mBucketApplicator);
return State::WORK_RUNNING;
}
// Application complete, check invariants and prepare for next
// bucket.
mApp.getInvariantManager().checkOnBucketApply(
mFirstBucket, mApplyState.currentLedger, mLevel, isCurr,
mEntryTypeFilter);
mFirstBucketApplicator.reset();
mFirstBucket.reset();
mApp.getCatchupManager().bucketsApplied();
mBucketsToApply.at(mBucketToApplyIndex),
mApplyState.currentLedger, mLevel, isCurr, mEntryTypeFilter);
prepareForNextBucket();
}
if (mSecondBucketApplicator)
if (!appliedAllBuckets())
{
bool isCurr = !isUsingBucketListDB;
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mSecondBucketApplicator)
{
advance(isCurr ? "curr" : "snap", *mSecondBucketApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSecondBucket, mApplyState.currentLedger, mLevel, isCurr,
mEntryTypeFilter);
mSecondBucketApplicator.reset();
mSecondBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}

if (!appliedAllLevels())
{
mLevel = nextLevel();
CLOG_DEBUG(History, "ApplyBuckets : starting next level: {}",
mLevel);
CLOG_DEBUG(History, "ApplyBuckets : starting level: {}, {}", mLevel,
isCurr ? "curr" : "snap");
startBucket();
return State::WORK_RUNNING;
}

Expand Down Expand Up @@ -438,12 +360,6 @@ ApplyBucketsWork::advance(std::string const& bucketName,
}
}

bool
ApplyBucketsWork::isLevelComplete()
{
return !(mApplying) || !(mFirstBucketApplicator || mSecondBucketApplicator);
}

std::string
ApplyBucketsWork::getStatus() const
{
Expand Down
20 changes: 7 additions & 13 deletions src/catchup/ApplyBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class ApplyBucketsWork : public Work
HistoryArchiveState const& mApplyState;
std::function<bool(LedgerEntryType)> mEntryTypeFilter;

bool mApplying{false};
bool mSpawnedAssumeStateWork{false};
std::shared_ptr<AssumeStateWork> mAssumeStateWork{};
std::shared_ptr<IndexBucketsWork> mIndexBucketsWork{};
Expand All @@ -36,29 +35,24 @@ class ApplyBucketsWork : public Work
size_t mAppliedSize{0};
size_t mLastAppliedSizeMb{0};
size_t mLastPos{0};
size_t mBucketToApplyIndex{0};
uint32_t mLevel{0};
uint32_t mMaxProtocolVersion{0};
uint32_t mMinProtocolVersionSeen{UINT32_MAX};
std::shared_ptr<Bucket const> mFirstBucket;
std::shared_ptr<Bucket const> mSecondBucket;
std::unique_ptr<BucketApplicator> mFirstBucketApplicator;
std::unique_ptr<BucketApplicator> mSecondBucketApplicator;
std::unordered_set<LedgerKey> mSeenKeys;
std::vector<std::shared_ptr<Bucket>> mBucketsToIndex;
std::vector<std::shared_ptr<Bucket>> mBucketsToApply;
std::unique_ptr<BucketApplicator> mBucketApplicator;
bool mDelayChecked{false};

BucketApplicator::Counters mCounters;

void advance(std::string const& name, BucketApplicator& applicator);
std::shared_ptr<Bucket> getBucket(std::string const& bucketHash);
BucketLevel& getBucketLevel(uint32_t level);
void startLevel();
bool isLevelComplete();

bool mDelayChecked{false};

uint32_t startingLevel();
uint32_t nextLevel() const;
bool appliedAllLevels() const;
bool appliedAllBuckets() const;
void startBucket();
void prepareForNextBucket();

public:
ApplyBucketsWork(
Expand Down

0 comments on commit 0aedf11

Please sign in to comment.