Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid memory allocations and copies when loading states #937

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions AllTests-minimal.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
OK: 5/5 Fail: 0/5 Skip: 0/5
## BlockPool finalization tests [Preset: minimal]
```diff
+ init with gaps [Preset: minimal] OK
+ prune heads on finalization [Preset: minimal] OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 2/2 Fail: 0/2 Skip: 0/2
## BlockRef and helpers [Preset: minimal]
```diff
+ getAncestorAt sanity [Preset: minimal] OK
Expand Down Expand Up @@ -257,4 +258,4 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
OK: 8/8 Fail: 0/8 Skip: 0/8

---TOTAL---
OK: 156/159 Fail: 3/159 Skip: 0/159
OK: 157/160 Fail: 3/160 Skip: 0/160
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ TOOLS := \
ncli_hash_tree_root \
ncli_pretty \
ncli_transition \
process_dashboard
process_dashboard \
stackSizes
# bench_bls_sig_agggregation TODO reenable after bls v0.10.1 changes
TOOLS_DIRS := \
beacon_chain \
Expand Down
8 changes: 4 additions & 4 deletions beacon_chain/attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ proc addResolved(pool: var AttestationPool, blck: BlockRef, attestation: Attesta
pool.blockPool, pool.blockPool.tmpState,
BlockSlot(blck: blck, slot: attestation.data.slot))

template state(): BeaconState = pool.blockPool.tmpState.data.data[]
template state(): BeaconState = pool.blockPool.tmpState.data.data

if not validate(state, attestation):
notice "Invalid attestation",
Expand Down Expand Up @@ -456,7 +456,7 @@ proc selectHead*(pool: AttestationPool): BlockRef =
justifiedHead = pool.blockPool.latestJustifiedBlock()

let newHead =
lmdGhost(pool, pool.blockPool.justifiedState.data.data[], justifiedHead.blck)
lmdGhost(pool, pool.blockPool.justifiedState.data.data, justifiedHead.blck)

newHead

Expand Down Expand Up @@ -529,9 +529,9 @@ proc isValidAttestation*(
# as it supports aggregated attestations (which this can't be)
var cache = get_empty_per_epoch_cache()
if not is_valid_indexed_attestation(
pool.blockPool.headState.data.data[],
pool.blockPool.headState.data.data,
get_indexed_attestation(
pool.blockPool.headState.data.data[], attestation, cache), {}):
pool.blockPool.headState.data.data, attestation, cache), {}):
debug "isValidAttestation: signature verification failed"
return false

Expand Down
41 changes: 32 additions & 9 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{.push raises: [Defect].}

import
options, typetraits, stew/[results, endians2],
typetraits, stew/[results, endians2],
serialization, chronicles,
spec/[datatypes, digest, crypto],
eth/db/kvstore, ssz
eth/db/kvstore, ssz, state_transition

type
BeaconChainDB* = ref object
Expand Down Expand Up @@ -45,7 +45,7 @@ func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]):
result[0] = byte ord(kind)
result[1 .. ^1] = key

func subkey(kind: type BeaconStateRef, key: Eth2Digest): auto =
func subkey(kind: type BeaconState, key: Eth2Digest): auto =
subkey(kHashToState, key.data)

func subkey(kind: type SignedBeaconBlock, key: Eth2Digest): auto =
Expand Down Expand Up @@ -86,13 +86,13 @@ proc get(db: BeaconChainDB, key: openArray[byte], T: typedesc): Opt[T] =
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: SignedBeaconBlock) =
db.put(subkey(type value, key), value)

proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconStateRef) =
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
# TODO prune old states - this is less easy than it seems as we never know
# when or if a particular state will become finalized.

db.put(subkey(type value, key), value)

proc putState*(db: BeaconChainDB, value: BeaconStateRef) =
proc putState*(db: BeaconChainDB, value: BeaconState) =
db.putState(hash_tree_root(value), value)

proc putStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot,
Expand All @@ -108,7 +108,7 @@ proc delBlock*(db: BeaconChainDB, key: Eth2Digest) =
"working database")

proc delState*(db: BeaconChainDB, key: Eth2Digest) =
db.backend.del(subkey(BeaconStateRef, key)).expect("working database")
db.backend.del(subkey(BeaconState, key)).expect("working database")

proc delStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot) =
db.backend.del(subkey(root, slot)).expect("working database")
Expand All @@ -122,8 +122,31 @@ proc putTailBlock*(db: BeaconChainDB, key: Eth2Digest) =
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Opt[SignedBeaconBlock] =
db.get(subkey(SignedBeaconBlock, key), SignedBeaconBlock)

proc getState*(db: BeaconChainDB, key: Eth2Digest): Opt[BeaconStateRef] =
db.get(subkey(BeaconStateRef, key), BeaconStateRef)
proc getState*(
db: BeaconChainDB, key: Eth2Digest, output: var BeaconState,
rollback: RollbackProc): bool =
## Load state into `output` - BeaconState is large so we want to avoid
## re-allocating it if possible
## Return `true` iff the entry was found in the database and `output` was
## overwritten.
# TODO rollback is needed to deal with bug - use `noRollback` to ignore:
# https://github.com/nim-lang/Nim/issues/14126
# TODO RVO is inefficient for large objects:
# https://github.com/nim-lang/Nim/issues/13879
# TODO address is needed because there's no way to express lifetimes in nim
# we'll use unsafeAddr to find the code later
let outputAddr = unsafeAddr output # callback is local
proc decode(data: openArray[byte]) =
try:
# TODO can't write to output directly..
outputAddr[] = SSZ.decode(data, BeaconState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy to create a helper that can write to a var directly. I'll look into after the PR is merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the serialization library would have to be rewritten to not use exceptions and assume a blank slate (empty seqs etc) which would complicate it somewhat - on the plus side it could reuse seq memory etc instead of allocating new seqs - in general though, it's uglier and less safe - specially in the presence of exceptions - this BeaconChainDB acts as an exception barrier so it can do tricks like this somewhat safely as long as it has the rollback workaround

the bigger issue here is the lifetime of outputAddr - I wish there was a safe construct for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if BeaconState becomes a ref, the compiler might move it somewhere else on the heap and invalidate the address.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I put unsafeAddr here so that we can grep for it easily - it should be safe though since the scope is local

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides, nim isn't fancy enough to do compacting, is it?

except SerializationError as e:
# If the data can't be deserialized, it could be because it's from a
# version of the software that uses a different SSZ encoding
warn "Unable to deserialize data, old database?", err = e.msg
rollback(outputAddr[])

db.backend.get(subkey(BeaconState, key), decode).expect("working database")

proc getStateRoot*(db: BeaconChainDB,
root: Eth2Digest,
Expand All @@ -140,7 +163,7 @@ proc containsBlock*(db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(SignedBeaconBlock, key)).expect("working database")

proc containsState*(db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(BeaconStateRef, key)).expect("working database")
db.backend.contains(subkey(BeaconState, key)).expect("working database")

iterator getAncestors*(db: BeaconChainDB, root: Eth2Digest):
tuple[root: Eth2Digest, blck: SignedBeaconBlock] =
Expand Down
50 changes: 25 additions & 25 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import
attestation_pool, block_pool, eth2_network, eth2_discovery,
beacon_node_types, mainchain_monitor, version, ssz, ssz/dynamic_navigator,
sync_protocol, request_manager, validator_keygen, interop, statusbar,
attestation_aggregation, sync_manager
attestation_aggregation, sync_manager, state_transition

const
genesisFile = "genesis.ssz"
Expand Down Expand Up @@ -121,8 +121,8 @@ proc getStateFromSnapshot(conf: BeaconNodeConf): NilableBeaconStateRef =
error "Failed to read genesis file", err = err.msg
quit 1

try:
result = SSZ.decode(snapshotContents, BeaconStateRef)
result = try:
newClone(SSZ.decode(snapshotContents, BeaconState))
except SerializationError:
error "Failed to import genesis file", path = genesisPath
quit 1
Expand Down Expand Up @@ -192,7 +192,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
let tailBlock = get_initial_beacon_block(genesisState[])

try:
BlockPool.preInit(db, genesisState, tailBlock)
BlockPool.preInit(db, genesisState[], tailBlock)
doAssert BlockPool.isInitialized(db), "preInit should have initialized db"
except CatchableError as e:
error "Failed to initialize database", err = e.msg
Expand All @@ -219,7 +219,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
nil

let
enrForkId = enrForkIdFromState(blockPool.headState.data.data[])
enrForkId = enrForkIdFromState(blockPool.headState.data.data)
topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.forkDigest)
topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.forkDigest)
network = await createEth2Node(conf, enrForkId)
Expand All @@ -235,7 +235,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
blockPool: blockPool,
attestationPool: AttestationPool.init(blockPool),
mainchainMonitor: mainchainMonitor,
beaconClock: BeaconClock.init(blockPool.headState.data.data[]),
beaconClock: BeaconClock.init(blockPool.headState.data.data),
rpcServer: rpcServer,
forkDigest: enrForkId.forkDigest,
topicBeaconBlocks: topicBeaconBlocks,
Expand Down Expand Up @@ -410,15 +410,15 @@ proc proposeBlock(node: BeaconNode,
(get_eth1data_stub(state.eth1_deposit_index, slot.compute_epoch_at_slot()),
newSeq[Deposit]())
else:
node.mainchainMonitor.getBlockProposalData(state[])
node.mainchainMonitor.getBlockProposalData(state)

let message = makeBeaconBlock(
state[],
state,
head.root,
validator.genRandaoReveal(state.fork, state.genesis_validators_root, slot),
eth1data,
Eth2Digest(),
node.attestationPool.getAttestationsForBlock(state[]),
node.attestationPool.getAttestationsForBlock(state),
deposits)

if not message.isSome():
Expand Down Expand Up @@ -546,7 +546,7 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
slot = shortLog(slot)
return

let attestationHead = head.findAncestorBySlot(slot)
let attestationHead = head.atSlot(slot)
if head != attestationHead.blck:
# In rare cases, such as when we're busy syncing or just slow, we'll be
# attesting to a past state - we must then recreate the world as it looked
Expand Down Expand Up @@ -576,16 +576,16 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
# version here that calculates the committee for a single slot only
node.blockPool.withState(node.blockPool.tmpState, attestationHead):
var cache = get_empty_per_epoch_cache()
let committees_per_slot = get_committee_count_at_slot(state[], slot)
let committees_per_slot = get_committee_count_at_slot(state, slot)

for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee(
state[], slot, committee_index.CommitteeIndex, cache)
state, slot, committee_index.CommitteeIndex, cache)

for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(state[], validatorIdx)
let validator = node.getAttachedValidator(state, validatorIdx)
if validator != nil:
let ad = makeAttestationData(state[], slot, committee_index, blck.root)
let ad = makeAttestationData(state, slot, committee_index, blck.root)
attestations.add((ad, committee.len, index_in_committee, validator))

for a in attestations:
Expand Down Expand Up @@ -649,21 +649,21 @@ proc broadcastAggregatedAttestations(
let bs = BlockSlot(blck: aggregationHead, slot: aggregationSlot)
node.blockPool.withState(node.blockPool.tmpState, bs):
let
committees_per_slot = get_committee_count_at_slot(state[], aggregationSlot)
committees_per_slot = get_committee_count_at_slot(state, aggregationSlot)
var cache = get_empty_per_epoch_cache()
for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee(
state[], aggregationSlot, committee_index.CommitteeIndex, cache)
state, aggregationSlot, committee_index.CommitteeIndex, cache)

for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(state[], validatorIdx)
let validator = node.getAttachedValidator(state, validatorIdx)
if validator != nil:
# This is slightly strange/inverted control flow, since really it's
# going to happen once per slot, but this is the best way to get at
# the validator index and private key pair. TODO verify it only has
# one isSome() with test.
let aggregateAndProof =
aggregate_attestations(node.attestationPool, state[],
aggregate_attestations(node.attestationPool, state,
committee_index.CommitteeIndex,
# TODO https://github.com/status-im/nim-beacon-chain/issues/545
# this assumes in-process private keys
Expand Down Expand Up @@ -1060,13 +1060,13 @@ proc installBeaconApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
requireOneOf(slot, root)
if slot.isSome:
let blk = node.blockPool.head.blck.atSlot(slot.get)
var tmpState = emptyStateData()
node.blockPool.withState(tmpState, blk):
node.blockPool.withState(node.blockPool.tmpState, blk):
return jsonResult(state)
else:
let state = node.db.getState(root.get)
if state.isSome:
return jsonResult(state.get)
let tmp = BeaconStateRef() # TODO use tmpState - but load the entire StateData!
let state = node.db.getState(root.get, tmp[], noRollback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another usage of db.getNewState()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea is not to allocate here but rather reuse an existing instance like tmpState - that requires more rewriting though which was planned for a later PR

if state:
return jsonResult(tmp[])
else:
return StringOfJson("null")

Expand Down Expand Up @@ -1193,7 +1193,7 @@ proc start(node: BeaconNode) =
bs = BlockSlot(blck: head.blck, slot: head.blck.slot)

node.blockPool.withState(node.blockPool.tmpState, bs):
node.addLocalValidators(state[])
node.addLocalValidators(state)

node.run()

Expand Down Expand Up @@ -1282,7 +1282,7 @@ when hasPrompt:
# TODO slow linear scan!
for idx, b in node.blockPool.headState.data.data.balances:
if node.getAttachedValidator(
node.blockPool.headState.data.data[], ValidatorIndex(idx)) != nil:
node.blockPool.headState.data.data, ValidatorIndex(idx)) != nil:
balance += b
formatGwei(balance)

Expand Down
28 changes: 10 additions & 18 deletions beacon_chain/beacon_node_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,24 +225,16 @@ type
root*: Eth2Digest
historySlots*: uint64

func emptyStateData*: StateData =
StateData(
data: HashedBeaconState(
# Please note that this initialization is needed in order
# to allocate memory for the BeaconState:
data: BeaconStateRef(),
root: default(Eth2Digest)
),
blck: default(BlockRef))

func clone*(other: StateData): StateData =
StateData(data: clone(other.data),
blck: other.blck)

proc shortLog*(v: AttachedValidator): string = shortLog(v.pubKey)

chronicles.formatIt BlockSlot:
it.blck.root.data[0..3].toHex() & ":" & $it.slot
proc shortLog*(v: BlockSlot): string =
if v.blck.slot == v.slot:
v.blck.root.data[0..3].toHex() & ":" & $v.blck.slot
else: # There was a gap - log it
v.blck.root.data[0..3].toHex() & ":" & $v.blck.slot & "@" & $v.slot

proc shortLog*(v: BlockRef): string =
v.root.data[0..3].toHex() & ":" & $v.slot

chronicles.formatIt BlockRef:
it.root.data[0..3].toHex() & ":" & $it.slot
chronicles.formatIt BlockSlot: shortLog(it)
chronicles.formatIt BlockRef: shortLog(it)
Loading