Skip to content

Commit

Permalink
Merge c400e7c into bf60786
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Sep 26, 2024
2 parents bf60786 + c400e7c commit 45e6bc8
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 40 deletions.
13 changes: 9 additions & 4 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";
import {serializeState} from "../serializeState.js";
import {AllocSource, BufferPool} from "../../util/bufferPool.js";
import {Metrics} from "../../metrics/metrics.js";

/**
* Minimum number of epochs between single temp archived states
Expand Down Expand Up @@ -48,13 +49,13 @@ export class StatesArchiver {
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
async maybeArchiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

if (finalized.epoch - lastStoredEpoch >= Math.min(PERSIST_TEMP_STATE_EVERY_EPOCHS, archiveStateEpochFrequency)) {
await this.archiveState(finalized);
await this.archiveState(finalized, metrics);

// Only check the current and previous intervals
const minEpoch = Math.max(
Expand Down Expand Up @@ -86,7 +87,7 @@ export class StatesArchiver {
* Archives finalized states from active bucket to archive bucket.
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
async archiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
Expand All @@ -99,10 +100,14 @@ export class StatesArchiver {
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// serialize state using BufferPool if provided
const timer = metrics?.stateSerializeDuration.startTimer({source: AllocSource.ARCHIVE_STATE});
await serializeState(
finalizedStateOrBytes,
AllocSource.ARCHIVE_STATE,
(stateBytes) => this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes),
(stateBytes) => {
timer?.();
return this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes);
},
this.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
Expand Down
6 changes: 4 additions & 2 deletions packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {Metrics} from "../../metrics/metrics.js";
import {StatesArchiver, StatesArchiverOpts} from "./archiveStates.js";
import {archiveBlocks} from "./archiveBlocks.js";

Expand Down Expand Up @@ -45,7 +46,8 @@ export class Archiver {
private readonly chain: IBeaconChain,
private readonly logger: Logger,
signal: AbortSignal,
opts: ArchiverOpts
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool);
Expand Down Expand Up @@ -105,7 +107,7 @@ export class Archiver {
this.prevFinalized = finalized;

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);
await this.statesArchiver.maybeArchiveState(finalized, this.metrics);

this.chain.regen.pruneOnFinalized(finalizedEpoch);

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ export class BeaconChain implements IBeaconChain {
this.bls = bls;
this.emitter = emitter;

this.archiver = new Archiver(db, this, logger, signal, opts);
this.archiver = new Archiver(db, this, logger, signal, opts, metrics);
// always run PrepareNextSlotScheduler except for fork_choice spec tests
if (!opts?.disablePrepareNextSlot) {
new PrepareNextSlotScheduler(this, this.config, metrics, this.logger, signal);
Expand Down
11 changes: 5 additions & 6 deletions packages/beacon-node/src/chain/serializeState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ export async function serializeState<T>(
const size = state.type.tree_serializedSize(state.node);
let stateBytes: Uint8Array | null = null;
if (bufferPool) {
const bufferWithKey = bufferPool.alloc(size, source);
using bufferWithKey = bufferPool.alloc(size, source);
if (bufferWithKey) {
stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
return processFn(stateBytes);
}
// release the buffer back to the pool automatically
}

if (!stateBytes) {
// we already have metrics in BufferPool so no need to do it here
stateBytes = state.serialize();
}
// we already have metrics in BufferPool so no need to do it here
stateBytes = state.serialize();

return processFn(stateBytes);
// release the buffer back to the pool automatically
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private readonly cache: MapTracker<CacheKey, CacheItem>;
/** Epoch -> Set<blockRoot> */
private readonly epochIndex = new MapDef<Epoch, Set<RootHex>>(() => new Set<string>());
private readonly metrics: Metrics["cpStateCache"] | null | undefined;
private readonly metrics: Metrics | null | undefined;
private readonly logger: Logger;
private readonly clock: IClock | null | undefined;
private readonly signal: AbortSignal | undefined;
Expand Down Expand Up @@ -123,7 +123,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
) {
this.cache = new MapTracker(metrics?.cpStateCache);
if (metrics) {
this.metrics = metrics.cpStateCache;
this.metrics = metrics;
metrics.cpStateCache.size.addCollect(() => {
let persistCount = 0;
let inMemoryCount = 0;
Expand Down Expand Up @@ -194,24 +194,26 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const {persistedKey, stateBytes} = stateOrStateBytesData;
const logMeta = {persistedKey: toHex(persistedKey)};
this.logger.debug("Reload: read state successful", logMeta);
this.metrics?.stateReloadSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
this.metrics?.cpStateCache.stateReloadSecFromSlot.observe(
this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0
);
const seedState = this.findSeedStateToReload(cp);
this.metrics?.stateReloadEpochDiff.observe(Math.abs(seedState.epochCtx.epoch - cp.epoch));
this.metrics?.cpStateCache.stateReloadEpochDiff.observe(Math.abs(seedState.epochCtx.epoch - cp.epoch));
this.logger.debug("Reload: found seed state", {...logMeta, seedSlot: seedState.slot});

try {
// 80% of validators serialization time comes from memory allocation, this is to avoid it
const sszTimer = this.metrics?.stateReloadValidatorsSerializeDuration.startTimer();
const sszTimer = this.metrics?.cpStateCache.stateReloadValidatorsSerializeDuration.startTimer();
// automatically free the buffer pool after this scope
using validatorsBytesWithKey = this.serializeStateValidators(seedState);
let validatorsBytes = validatorsBytesWithKey?.buffer;
if (validatorsBytes == null) {
// fallback logic in case we can't use the buffer pool
this.metrics?.stateReloadValidatorsSerializeAllocCount.inc();
this.metrics?.cpStateCache.stateReloadValidatorsSerializeAllocCount.inc();
validatorsBytes = seedState.validators.serialize();
}
sszTimer?.();
const timer = this.metrics?.stateReloadDuration.startTimer();
const timer = this.metrics?.cpStateCache.stateReloadDuration.startTimer();
const newCachedState = loadCachedBeaconState(seedState, stateBytes, {}, validatorsBytes);
newCachedState.commit();
const stateRoot = toRootHex(newCachedState.hashTreeRoot());
Expand Down Expand Up @@ -275,7 +277,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
}

const persistedKey = cacheItem.value;
const dbReadTimer = this.metrics?.stateReloadDbReadTime.startTimer();
const dbReadTimer = this.metrics?.cpStateCache.stateReloadDbReadTime.startTimer();
const stateBytes = await this.datastore.read(persistedKey);
dbReadTimer?.();

Expand All @@ -289,23 +291,23 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* Similar to get() api without reloading from disk
*/
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
this.metrics?.cpStateCache.lookups.inc();
const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey);
const cacheItem = this.cache.get(cpKey);

if (cacheItem === undefined) {
return null;
}

this.metrics?.hits.inc();
this.metrics?.cpStateCache.hits.inc();

if (cpKey === this.preComputedCheckpoint) {
this.preComputedCheckpointHits = (this.preComputedCheckpointHits ?? 0) + 1;
}

if (isInMemoryCacheItem(cacheItem)) {
const {state} = cacheItem;
this.metrics?.stateClonedCount.observe(state.clonedCount);
this.metrics?.cpStateCache.stateClonedCount.observe(state.clonedCount);
return state.clone(opts?.dontTransferCache);
}

Expand All @@ -319,7 +321,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const cpHex = toCheckpointHex(cp);
const key = toCacheKey(cpHex);
const cacheItem = this.cache.get(key);
this.metrics?.adds.inc();
this.metrics?.cpStateCache.adds.inc();
if (cacheItem !== undefined && isPersistedCacheItem(cacheItem)) {
const persistedKey = cacheItem.value;
// was persisted to disk, set back to memory
Expand Down Expand Up @@ -683,22 +685,29 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta);
} else {
// persist and do not update epochIndex
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
this.metrics?.cpStateCache.statePersistSecFromSlot.observe(
this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0
);
const cpPersist = {epoch: epoch, root: fromHex(rootHex)};
// It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
// As monitored on holesky as of Jan 2024:
// - This does not increase heap allocation while gc time is the same
// - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
// - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
// - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
const timer = this.metrics?.stateSerializeDuration.startTimer();
const timer = this.metrics?.stateSerializeDuration.startTimer({
source: AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE,
});
persistedKey = await serializeState(
state,
AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE,
(stateBytes) => this.datastore.write(cpPersist, stateBytes),
(stateBytes) => {
timer?.();
return this.datastore.write(cpPersist, stateBytes);
},
this.bufferPool
);
timer?.();

persistCount++;
this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", {
...logMeta,
Expand All @@ -718,7 +727,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.cache.delete(cpKey);
this.epochIndex.get(epoch)?.delete(rootHex);
}
this.metrics?.statePruneFromMemoryCount.inc();
this.metrics?.cpStateCache.statePruneFromMemoryCount.inc();
this.logger.verbose("Pruned checkpoint state from memory", logMeta);
}
}
Expand All @@ -742,7 +751,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (persistedKey) {
await this.datastore.remove(persistedKey);
persistCount++;
this.metrics?.persistedStateRemoveCount.inc();
this.metrics?.cpStateCache.persistedStateRemoveCount.inc();
}
}
this.cache.delete(key);
Expand Down
11 changes: 6 additions & 5 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1218,11 +1218,6 @@ export function createLodestarMetrics(
help: "Histogram of cloned count per state every time state.clone() is called",
buckets: [1, 2, 5, 10, 50, 250],
}),
stateSerializeDuration: register.histogram({
name: "lodestar_cp_state_cache_state_serialize_seconds",
help: "Histogram of time to serialize state to db",
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
numStatesUpdated: register.histogram({
name: "lodestar_cp_state_cache_state_updated_count",
help: "Histogram of number of state cache items updated every time removing and adding pubkeys to pubkey cache",
Expand Down Expand Up @@ -1434,6 +1429,12 @@ export function createLodestarMetrics(
name: "lodestar_unhandled_promise_rejections_total",
help: "UnhandledPromiseRejection total count",
}),
stateSerializeDuration: register.histogram<{source: AllocSource}>({
name: "lodestar_state_serialize_seconds",
help: "Histogram of time to serialize state",
labelNames: ["source"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),

// regen.getState metrics
regenGetState: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,10 @@ describe("regen/reload states with n-historical states configuration", function
)?.value
).toEqual(reloadCount);

const stateSszMetricValues = await (followupBn.metrics?.cpStateCache.stateSerializeDuration as Histogram).get();
const stateSszMetricValues = await (followupBn.metrics?.stateSerializeDuration as Histogram).get();
expect(
stateSszMetricValues?.values.find(
(value) => value.metricName === "lodestar_cp_state_cache_state_serialize_seconds_count"
)?.value
stateSszMetricValues?.values.find((value) => value.metricName === "lodestar_state_serialize_seconds_count")
?.value
).toEqual(persistCount);

// assert number of persisted/in-memory states
Expand Down

0 comments on commit 45e6bc8

Please sign in to comment.