Skip to content

Commit

Permalink
feat: async shuffling refactor (#6938)
Browse files Browse the repository at this point in the history
* feat: add ShufflingCache to EpochCache

* fix: implementation in state-transition for EpochCache with ShufflingCache

* feat: remove shufflingCache.processState

* feat: implement ShufflingCache changes in beacon-node

* feat: pass shufflingCache when loading cached state from db

* test: fix state-transition tests for EpochCache changes

* feat: Pass shufflingCache to EpochCache at startup

* test: fix slot off by one for decision root in perf test

* chore: use ?. syntax

* chore: refactoring

* feat: add comments and clean up afterProcessEpoch

* fix: perf test slot incrementing

* fix: remove MockShufflingCache

* Revert "chore: refactoring"

This reverts commit 104aa56.

* refactor: shufflingCache getters

* refactor: shufflingCache setters

* refactor: build and getOrBuild

* docs: add comments to ShufflingCache methods

* chore: lint issues

* test: update tests in beacon-node

* chore: lint

* feat: get shufflings from cache for API

* feat: minTimeDelayToBuildShuffling cli flag

* test: fix shufflingCache promise insertion test

* fix: rebase conflicts

* fix: changes from debugging sim tests

* refactor: minimize changes in afterProcessEpoch

* chore: fix lint

* chore: fix check-types

* chore: fix check-types

* feat: add diff utility

* fix: bug in spec tests from invalid nextActiveIndices

* refactor: add/remove comments

* refactor: remove this.activeIndicesLength from EpochCache

* refactor: simplify shufflingCache.getSync

* refactor: remove unnecessary undefined's

* refactor: clean up ShufflingCache unit test

* feat: add metrics for ShufflingCache

* feat: add shufflingCache metrics to state-transition

* chore: lint

* fix: metric name clash

* refactor: add comment about not having ShufflingCache in EpochCache

* refactor: rename shuffling decision root functions

* refactor: remove unused comment

* feat: async add nextShuffling to EpochCache after its built

* feat: make ShufflingCache.set private

* feat: chance metrics to nextShufflingNotOnEpochCache instead of positive case

* refactor: move diff to separate PR

* chore: fix tests using shufflingCache.set method

* feat: remove minTimeDelayToBuild

* feat: return promise from insertPromise and then through build

* fix: update metrics names and help field

* feat: move build of shuffling to beforeProcessEpoch

* feat: allow calc of pivot slot before slot increment

* fix: calc of pivot slot before slot increment

* Revert "fix: calc of pivot slot before slot increment"

This reverts commit 5e65f7e.

* Revert "feat: allow calc of pivot slot before slot increment"

This reverts commit ed850ee.

* feat: allow getting current block root for shuffling calculation

* fix: get nextShufflingDecisionRoot directly from state.blockRoots

* fix: convert toRootHex

* docs: add comment about pulling decisionRoot directly from state

* feat: add back metrics for regen attestation cache hit/miss

* docs: fix docstring on shufflingCache.build

* refactor: change validatorIndices to Uint32Array

* refactor: remove comment and change variable name

* fix: use toRootHex instead of toHexString

* refactor: deduplicate moved function computeAnchorCheckpoint

* fix: touch up metrics per PR comments

* fix: merge conflict

* chore: lint

* refactor: add scope around activeIndices to GC arrays

* feat: directly use Uint32Array instead of transcribing number array to Uint32Array

* refactor: activeIndices per tuyen comment

* refactor: rename to epochAfterNext

* chore: review PR

* feat: update no shuffling ApiError to 500 status

* fix: add back unnecessary eslint directive. to be remove under separate PR

* feat: update no shuffling ApiError to 500 status

* docs: add comment about upcomingEpoch

---------

Co-authored-by: Cayman <[email protected]>
Co-authored-by: Tuyen Nguyen <[email protected]>
  • Loading branch information
3 people authored Sep 23, 2024
1 parent 404f13a commit cd98c23
Show file tree
Hide file tree
Showing 34 changed files with 700 additions and 362 deletions.
9 changes: 8 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/state/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,14 @@ export function getBeaconStateApi({

const epoch = filters.epoch ?? computeEpochAtSlot(state.slot);
const startSlot = computeStartSlotAtEpoch(epoch);
const shuffling = stateCached.epochCtx.getShufflingAtEpoch(epoch);
const decisionRoot = stateCached.epochCtx.getShufflingDecisionRoot(epoch);
const shuffling = await chain.shufflingCache.get(epoch, decisionRoot);
if (!shuffling) {
throw new ApiError(
500,
`No shuffling found to calculate committees for epoch: ${epoch} and decisionRoot: ${decisionRoot}`
);
}
const committees = shuffling.committees;
const committeesFlat = committees.flatMap((slotCommittees, slotInEpoch) => {
const slot = startSlot + slotInEpoch;
Expand Down
11 changes: 10 additions & 1 deletion packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {ApplicationMethods} from "@lodestar/api/server";
import {
CachedBeaconStateAllForks,
computeStartSlotAtEpoch,
calculateCommitteeAssignments,
proposerShufflingDecisionRoot,
attesterShufflingDecisionRoot,
getBlockRootAtSlot,
Expand Down Expand Up @@ -995,7 +996,15 @@ export function getValidatorApi(

// Check that all validatorIndex belong to the state before calling getCommitteeAssignments()
const pubkeys = getPubkeysForIndices(state.validators, indices);
const committeeAssignments = state.epochCtx.getCommitteeAssignments(epoch, indices);
const decisionRoot = state.epochCtx.getShufflingDecisionRoot(epoch);
const shuffling = await chain.shufflingCache.get(epoch, decisionRoot);
if (!shuffling) {
throw new ApiError(
500,
`No shuffling found to calculate committee assignments for epoch: ${epoch} and decisionRoot: ${decisionRoot}`
);
}
const committeeAssignments = calculateCommitteeAssignments(shuffling, indices);
const duties: routes.validator.AttesterDuty[] = [];
for (let i = 0, len = indices.length; i < len; i++) {
const validatorIndex = indices[i];
Expand Down
7 changes: 0 additions & 7 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ export async function importBlock(
const blockRootHex = toRootHex(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(blockSlot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
const recvToValLatency = Date.now() / 1000 - (opts.seenTimestampSec ?? Date.now() / 1000);
Expand Down Expand Up @@ -335,12 +334,6 @@ export async function importBlock(
this.logger.verbose("After importBlock caching postState without SSZ cache", {slot: postState.slot});
}

if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot});
}

if (blockSlot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
Expand Down
25 changes: 18 additions & 7 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
PubkeyIndexMap,
EpochShuffling,
computeEndSlotAtEpoch,
computeAnchorCheckpoint,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {
Expand Down Expand Up @@ -60,7 +61,6 @@ import {
import {IChainOptions} from "./options.js";
import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {initializeForkChoice} from "./forkChoice/index.js";
import {computeAnchorCheckpoint} from "./initState.js";
import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js";
import {
SeenAttesters,
Expand Down Expand Up @@ -246,7 +246,6 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics, this.opts);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand All @@ -261,9 +260,21 @@ export class BeaconChain implements IBeaconChain {
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.nextShuffling.epoch);

this.shufflingCache = cachedState.epochCtx.shufflingCache = new ShufflingCache(metrics, logger, this.opts, [
{
shuffling: cachedState.epochCtx.previousShuffling,
decisionRoot: cachedState.epochCtx.previousDecisionRoot,
},
{
shuffling: cachedState.epochCtx.currentShuffling,
decisionRoot: cachedState.epochCtx.currentDecisionRoot,
},
{
shuffling: cachedState.epochCtx.nextShuffling,
decisionRoot: cachedState.epochCtx.nextDecisionRoot,
},
]);

// Persist single global instance of state caches
this.pubkey2index = cachedState.epochCtx.pubkey2index;
Expand Down Expand Up @@ -902,8 +913,8 @@ export class BeaconChain implements IBeaconChain {
state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
}

// resolve the promise to unblock other calls of the same epoch and dependent root
return this.shufflingCache.processState(state, attEpoch);
// should always be the current epoch of the active context so no need to await a result from the ShufflingCache
return state.epochCtx.getShufflingAtEpoch(attEpoch);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/forkChoice/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import {
getEffectiveBalanceIncrementsZeroInactive,
isExecutionStateType,
isMergeTransitionComplete,
computeAnchorCheckpoint,
} from "@lodestar/state-transition";

import {Logger, toRootHex} from "@lodestar/utils";
import {computeAnchorCheckpoint} from "../initState.js";
import {ChainEventEmitter} from "../emitter.js";
import {ChainEvent} from "../emitter.js";
import {GENESIS_SLOT} from "../../constants/index.js";
Expand Down
38 changes: 2 additions & 36 deletions packages/beacon-node/src/chain/initState.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import {
blockToHeader,
computeEpochAtSlot,
BeaconStateAllForks,
CachedBeaconStateAllForks,
computeCheckpointEpochAtStateSlot,
computeStartSlotAtEpoch,
} from "@lodestar/state-transition";
import {SignedBeaconBlock, phase0, ssz} from "@lodestar/types";
import {SignedBeaconBlock} from "@lodestar/types";
import {ChainForkConfig} from "@lodestar/config";
import {Logger, toHex, toRootHex} from "@lodestar/utils";
import {GENESIS_SLOT, ZERO_HASH} from "../constants/index.js";
import {GENESIS_SLOT} from "../constants/index.js";
import {IBeaconDb} from "../db/index.js";
import {Eth1Provider} from "../eth1/index.js";
import {Metrics} from "../metrics/index.js";
Expand Down Expand Up @@ -204,35 +202,3 @@ export function initBeaconMetrics(metrics: Metrics, state: BeaconStateAllForks):
metrics.currentJustifiedEpoch.set(state.currentJustifiedCheckpoint.epoch);
metrics.finalizedEpoch.set(state.finalizedCheckpoint.epoch);
}

export function computeAnchorCheckpoint(
config: ChainForkConfig,
anchorState: BeaconStateAllForks
): {checkpoint: phase0.Checkpoint; blockHeader: phase0.BeaconBlockHeader} {
let blockHeader;
let root;
const blockTypes = config.getForkTypes(anchorState.latestBlockHeader.slot);

if (anchorState.latestBlockHeader.slot === GENESIS_SLOT) {
const block = blockTypes.BeaconBlock.defaultValue();
block.stateRoot = anchorState.hashTreeRoot();
blockHeader = blockToHeader(config, block);
root = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blockHeader);
} else {
blockHeader = ssz.phase0.BeaconBlockHeader.clone(anchorState.latestBlockHeader);
if (ssz.Root.equals(blockHeader.stateRoot, ZERO_HASH)) {
blockHeader.stateRoot = anchorState.hashTreeRoot();
}
root = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blockHeader);
}

return {
checkpoint: {
root,
// the checkpoint epoch = computeEpochAtSlot(anchorState.slot) + 1 if slot is not at epoch boundary
// this is similar to a process_slots() call
epoch: computeCheckpointEpochAtStateSlot(anchorState.slot),
},
blockHeader,
};
}
Loading

0 comments on commit cd98c23

Please sign in to comment.