Skip to content

Commit

Permalink
avoid memory allocations and copies when loading states
Browse files Browse the repository at this point in the history
* rolls back some of the ref changes
* adds utility to calculate stack sizes
  • Loading branch information
arnetheduck committed Apr 27, 2020
1 parent 03a147a commit 09671fd
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 208 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ TOOLS := \
ncli_hash_tree_root \
ncli_pretty \
ncli_transition \
process_dashboard
process_dashboard \
stackSizes
# bench_bls_sig_agggregation TODO reenable after bls v0.10.1 changes
TOOLS_DIRS := \
beacon_chain \
Expand Down
8 changes: 4 additions & 4 deletions beacon_chain/attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ proc addResolved(pool: var AttestationPool, blck: BlockRef, attestation: Attesta
pool.blockPool, pool.blockPool.tmpState,
BlockSlot(blck: blck, slot: attestation.data.slot))

template state(): BeaconState = pool.blockPool.tmpState.data.data[]
template state(): BeaconState = pool.blockPool.tmpState.data.data

if not validate(state, attestation):
notice "Invalid attestation",
Expand Down Expand Up @@ -456,7 +456,7 @@ proc selectHead*(pool: AttestationPool): BlockRef =
justifiedHead = pool.blockPool.latestJustifiedBlock()

let newHead =
lmdGhost(pool, pool.blockPool.justifiedState.data.data[], justifiedHead.blck)
lmdGhost(pool, pool.blockPool.justifiedState.data.data, justifiedHead.blck)

newHead

Expand Down Expand Up @@ -529,9 +529,9 @@ proc isValidAttestation*(
# as it supports aggregated attestations (which this can't be)
var cache = get_empty_per_epoch_cache()
if not is_valid_indexed_attestation(
pool.blockPool.headState.data.data[],
pool.blockPool.headState.data.data,
get_indexed_attestation(
pool.blockPool.headState.data.data[], attestation, cache), {}):
pool.blockPool.headState.data.data, attestation, cache), {}):
debug "isValidAttestation: signature verification failed"
return false

Expand Down
43 changes: 34 additions & 9 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{.push raises: [Defect].}

import
options, typetraits, stew/[results, endians2],
typetraits, stew/[results, endians2],
serialization, chronicles,
spec/[datatypes, digest, crypto],
kvstore, ssz
kvstore, ssz, state_transition

export results

type
BeaconChainDB* = ref object
Expand Down Expand Up @@ -45,7 +47,7 @@ func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]):
result[0] = byte ord(kind)
result[1 .. ^1] = key

func subkey(kind: type BeaconStateRef, key: Eth2Digest): auto =
func subkey(kind: type BeaconState, key: Eth2Digest): auto =
subkey(kHashToState, key.data)

func subkey(kind: type SignedBeaconBlock, key: Eth2Digest): auto =
Expand Down Expand Up @@ -86,13 +88,13 @@ proc get(db: BeaconChainDB, key: openArray[byte], T: typedesc): Opt[T] =
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: SignedBeaconBlock) =
db.put(subkey(type value, key), value)

proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconStateRef) =
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
# TODO prune old states - this is less easy than it seems as we never know
# when or if a particular state will become finalized.

db.put(subkey(type value, key), value)

proc putState*(db: BeaconChainDB, value: BeaconStateRef) =
proc putState*(db: BeaconChainDB, value: BeaconState) =
db.putState(hash_tree_root(value), value)

proc putStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot,
Expand All @@ -108,7 +110,7 @@ proc delBlock*(db: BeaconChainDB, key: Eth2Digest) =
"working database")

proc delState*(db: BeaconChainDB, key: Eth2Digest) =
db.backend.del(subkey(BeaconStateRef, key)).expect("working database")
db.backend.del(subkey(BeaconState, key)).expect("working database")

proc delStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot) =
db.backend.del(subkey(root, slot)).expect("working database")
Expand All @@ -122,8 +124,31 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[SignedBeaconBlock] =
db.get(subkey(SignedBeaconBlock, key), SignedBeaconBlock)

proc getState*(db: BeaconChainDB, key: Eth2Digest): Opt[BeaconStateRef] =
db.get(subkey(BeaconStateRef, key), BeaconStateRef)
proc getState*(
db: BeaconChainDB, key: Eth2Digest, output: var BeaconState,
rollback: RollbackProc): bool =
## Load state into `output` - BeaconState is large so we want to avoid
## re-allocating it if possible
## Return `true` iff the entry was found in the database and `output` was
## overwritten.
# TODO rollback is needed to deal with bug - use `noRollback` to ignore:
# https://github.com/nim-lang/Nim/issues/14126
# TODO RVO is inefficient for large objects:
# https://github.com/nim-lang/Nim/issues/13879
# TODO address is needed because there's no way to express lifetimes in nim
# we'll use unsafeAddr to find the code later
let outputAddr = unsafeAddr output # callback is local
proc decode(data: openArray[byte]) =
try:
# TODO can't write to output directly..
outputAddr[] = SSZ.decode(data, BeaconState)
except SerializationError as e:
# If the data can't be deserialized, it could be because it's from a
# version of the software that uses a different SSZ encoding
warn "Unable to deserialize data, old database?", err = e.msg
rollback(outputAddr[])

db.backend.get(subkey(BeaconState, key), decode).expect("working database")

proc getStateRoot*(db: BeaconChainDB,
root: Eth2Digest,
Expand All @@ -140,7 +165,7 @@ proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(SignedBeaconBlock, key)).expect("working database")

proc containsState*(db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(BeaconStateRef, key)).expect("working database")
db.backend.contains(subkey(BeaconState, key)).expect("working database")

iterator getAncestors*(db: BeaconChainDB, root: Eth2Digest):
tuple[root: Eth2Digest, blck: SignedBeaconBlock] =
Expand Down
50 changes: 27 additions & 23 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import
attestation_pool, block_pool, eth2_network, eth2_discovery,
beacon_node_types, mainchain_monitor, version, ssz, ssz/dynamic_navigator,
sync_protocol, request_manager, validator_keygen, interop, statusbar,
attestation_aggregation, sync_manager
attestation_aggregation, sync_manager, state_transition

const
genesisFile = "genesis.ssz"
Expand Down Expand Up @@ -122,11 +122,15 @@ proc getStateFromSnapshot(conf: BeaconNodeConf): NilableBeaconStateRef =
quit 1

try:
result = SSZ.decode(snapshotContents, BeaconStateRef)
let res = BeaconStateRef()
res[] = SSZ.decode(snapshotContents, BeaconState)
result = res
except SerializationError:
error "Failed to import genesis file", path = genesisPath
quit 1

echo sizeof(result[])

info "Loaded genesis state", path = genesisPath

if writeGenesisFile:
Expand Down Expand Up @@ -192,7 +196,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
let tailBlock = get_initial_beacon_block(genesisState[])

try:
BlockPool.preInit(db, genesisState, tailBlock)
BlockPool.preInit(db, genesisState[], tailBlock)
doAssert BlockPool.isInitialized(db), "preInit should have initialized db"
except CatchableError as e:
error "Failed to initialize database", err = e.msg
Expand All @@ -219,7 +223,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
nil

let
enrForkId = enrForkIdFromState(blockPool.headState.data.data[])
enrForkId = enrForkIdFromState(blockPool.headState.data.data)
topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.forkDigest)
topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.forkDigest)
network = await createEth2Node(conf, enrForkId)
Expand All @@ -235,7 +239,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
blockPool: blockPool,
attestationPool: AttestationPool.init(blockPool),
mainchainMonitor: mainchainMonitor,
beaconClock: BeaconClock.init(blockPool.headState.data.data[]),
beaconClock: BeaconClock.init(blockPool.headState.data.data),
rpcServer: rpcServer,
forkDigest: enrForkId.forkDigest,
topicBeaconBlocks: topicBeaconBlocks,
Expand Down Expand Up @@ -410,15 +414,15 @@ proc proposeBlock(node: BeaconNode,
(get_eth1data_stub(state.eth1_deposit_index, slot.compute_epoch_at_slot()),
newSeq[Deposit]())
else:
node.mainchainMonitor.getBlockProposalData(state[])
node.mainchainMonitor.getBlockProposalData(state)

let message = makeBeaconBlock(
state[],
state,
head.root,
validator.genRandaoReveal(state.fork, state.genesis_validators_root, slot),
eth1data,
Eth2Digest(),
node.attestationPool.getAttestationsForBlock(state[]),
node.attestationPool.getAttestationsForBlock(state),
deposits)

if not message.isSome():
Expand Down Expand Up @@ -576,16 +580,16 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
# version here that calculates the committee for a single slot only
node.blockPool.withState(node.blockPool.tmpState, attestationHead):
var cache = get_empty_per_epoch_cache()
let committees_per_slot = get_committee_count_at_slot(state[], slot)
let committees_per_slot = get_committee_count_at_slot(state, slot)

for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee(
state[], slot, committee_index.CommitteeIndex, cache)
state, slot, committee_index.CommitteeIndex, cache)

for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(state[], validatorIdx)
let validator = node.getAttachedValidator(state, validatorIdx)
if validator != nil:
let ad = makeAttestationData(state[], slot, committee_index, blck.root)
let ad = makeAttestationData(state, slot, committee_index, blck.root)
attestations.add((ad, committee.len, index_in_committee, validator))

for a in attestations:
Expand Down Expand Up @@ -649,21 +653,21 @@ proc broadcastAggregatedAttestations(
let bs = BlockSlot(blck: aggregationHead, slot: aggregationSlot)
node.blockPool.withState(node.blockPool.tmpState, bs):
let
committees_per_slot = get_committee_count_at_slot(state[], aggregationSlot)
committees_per_slot = get_committee_count_at_slot(state, aggregationSlot)
var cache = get_empty_per_epoch_cache()
for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee(
state[], aggregationSlot, committee_index.CommitteeIndex, cache)
state, aggregationSlot, committee_index.CommitteeIndex, cache)

for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(state[], validatorIdx)
let validator = node.getAttachedValidator(state, validatorIdx)
if validator != nil:
# This is slightly strange/inverted control flow, since really it's
# going to happen once per slot, but this is the best way to get at
# the validator index and private key pair. TODO verify it only has
# one isSome() with test.
let aggregateAndProof =
aggregate_attestations(node.attestationPool, state[],
aggregate_attestations(node.attestationPool, state,
committee_index.CommitteeIndex,
# TODO https://github.com/status-im/nim-beacon-chain/issues/545
# this assumes in-process private keys
Expand Down Expand Up @@ -1060,13 +1064,13 @@ proc installBeaconApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
requireOneOf(slot, root)
if slot.isSome:
let blk = node.blockPool.head.blck.atSlot(slot.get)
var tmpState = emptyStateData()
node.blockPool.withState(tmpState, blk):
node.blockPool.withState(node.blockPool.tmpState, blk):
return jsonResult(state)
else:
let state = node.db.getState(root.get)
if state.isSome:
return jsonResult(state.get)
let tmp = BeaconStateRef() # TODO use tmpState - but load the entire StateData!
let state = node.db.getState(root.get, tmp[], noRollback)
if state:
return jsonResult(tmp[])
else:
return StringOfJson("null")

Expand Down Expand Up @@ -1193,7 +1197,7 @@ proc start(node: BeaconNode) =
bs = BlockSlot(blck: head.blck, slot: head.blck.slot)

node.blockPool.withState(node.blockPool.tmpState, bs):
node.addLocalValidators(state[])
node.addLocalValidators(state)

node.run()

Expand Down Expand Up @@ -1282,7 +1286,7 @@ when hasPrompt:
# TODO slow linear scan!
for idx, b in node.blockPool.headState.data.data.balances:
if node.getAttachedValidator(
node.blockPool.headState.data.data[], ValidatorIndex(idx)) != nil:
node.blockPool.headState.data.data, ValidatorIndex(idx)) != nil:
balance += b
formatGwei(balance)

Expand Down
14 changes: 0 additions & 14 deletions beacon_chain/beacon_node_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,6 @@ type
root*: Eth2Digest
historySlots*: uint64

func emptyStateData*: StateData =
StateData(
data: HashedBeaconState(
# Please note that this initialization is needed in order
# to allocate memory for the BeaconState:
data: BeaconStateRef(),
root: default(Eth2Digest)
),
blck: default(BlockRef))

func clone*(other: StateData): StateData =
StateData(data: clone(other.data),
blck: other.blck)

proc shortLog*(v: AttachedValidator): string = shortLog(v.pubKey)

chronicles.formatIt BlockSlot:
Expand Down
Loading

0 comments on commit 09671fd

Please sign in to comment.