-
Notifications
You must be signed in to change notification settings - Fork 179
/
checkpoint.go
196 lines (171 loc) · 7.48 KB
/
checkpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package common
import (
"fmt"
"path/filepath"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// FindHeightsByCheckpoints finds the sealed height that produces the state commitment included in the checkpoint file.
func FindHeightsByCheckpoints(
logger zerolog.Logger,
headers storage.Headers,
seals storage.Seals,
checkpointFilePath string,
blocksToSkip uint,
startHeight uint64,
endHeight uint64,
) (
uint64, // sealed height that produces the state commitment included in the checkpoint file
flow.StateCommitment, // the state commitment that matches the sealed height
uint64, // the finalized height that seals the sealed height
error,
) {
// find all trie root hashes in the checkpoint file
dir, fileName := filepath.Split(checkpointFilePath)
hashes, err := wal.ReadTriesRootHash(logger, dir, fileName)
if err != nil {
return 0, flow.DummyStateCommitment, 0,
fmt.Errorf("could not read trie root hashes from checkpoint file %v: %w",
checkpointFilePath, err)
}
// convert all trie root hashes to state commitments
commitments := hashesToCommits(hashes)
commitMap := make(map[flow.StateCommitment]struct{}, len(commitments))
for _, commit := range commitments {
commitMap[commit] = struct{}{}
}
// iterate backwards from the end height to the start height
// to find the block that produces a state commitment in the given list
// It is safe to skip blocks in this linear search because we expect `stateCommitments` to hold commits
// for a contiguous range of blocks (for correct operation we assume `blocksToSkip` is smaller than this range).
// end height must be a sealed block
step := blocksToSkip + 1
for height := endHeight; height >= startHeight; height -= uint64(step) {
finalizedID, err := headers.BlockIDByHeight(height)
if err != nil {
return 0, flow.DummyStateCommitment, 0,
fmt.Errorf("could not find block by height %v: %w", height, err)
}
// since height is a sealed block height, then we must be able to find the seal for this block
finalizedSeal, err := seals.HighestInFork(finalizedID)
if err != nil {
return 0, flow.DummyStateCommitment, 0,
fmt.Errorf("could not find seal for block %v at height %v: %w", finalizedID, height, err)
}
commit := finalizedSeal.FinalState
_, ok := commitMap[commit]
if ok {
sealedBlock, err := headers.ByBlockID(finalizedSeal.BlockID)
if err != nil {
return 0, flow.DummyStateCommitment, 0,
fmt.Errorf("could not find block by ID %v: %w", finalizedSeal.BlockID, err)
}
log.Info().Msgf("successfully found block %v (%v) that seals block %v (%v) for commit %x in checkpoint file %v",
height, finalizedID,
sealedBlock.Height, finalizedSeal.BlockID,
commit, checkpointFilePath)
return sealedBlock.Height, commit, height, nil
}
if height < uint64(step) {
break
}
}
return 0, flow.DummyStateCommitment, 0,
fmt.Errorf("could not find commit within height range [%v,%v]", startHeight, endHeight)
}
// GenerateProtocolSnapshotForCheckpoint finds a sealed block that produces the state commitment contained in the latest
// checkpoint file, and return a protocol snapshot for the finalized block that seals the sealed block.
// The returned protocol snapshot can be used for dynamic bootstrapping an execution node along with the latest checkpoint file.
//
// When finding a sealed block it iterates backwards through each sealed height from the last sealed height, and see
// if the state commitment matches with one of the state commitments contained in the checkpoint file.
// However, the iteration could be slow, in order to speed up the iteration, we can skip some blocks each time.
// Since a checkpoint file usually contains 500 tries, which might cover around 250 blocks (assuming 2 tries per block),
// then skipping 10 blocks each time will still allow us to find the sealed block while not missing the height contained
// by the checkpoint file.
// So the blocksToSkip parameter is used to skip some blocks each time when iterating the sealed heights.
func GenerateProtocolSnapshotForCheckpoint(
logger zerolog.Logger,
state protocol.State,
headers storage.Headers,
seals storage.Seals,
checkpointDir string,
blocksToSkip uint,
) (protocol.Snapshot, uint64, flow.StateCommitment, error) {
// skip X blocks (i.e. 10) each time to find the block that produces the state commitment in the checkpoint file
// since a checkpoint file contains 500 tries, this allows us to find the block more efficiently
sealed, err := state.Sealed().Head()
if err != nil {
return nil, 0, flow.DummyStateCommitment, err
}
endHeight := sealed.Height
return GenerateProtocolSnapshotForCheckpointWithHeights(logger, state, headers, seals,
checkpointDir,
blocksToSkip,
endHeight,
)
}
// findLatestCheckpointFilePath finds the latest checkpoint file in the given directory
// it returns the header file name of the latest checkpoint file
func findLatestCheckpointFilePath(checkpointDir string) (string, error) {
_, last, err := wal.ListCheckpoints(checkpointDir)
if err != nil {
return "", fmt.Errorf("could not list checkpoints in directory %v: %w", checkpointDir, err)
}
fileName := wal.NumberToFilename(last)
if last < 0 {
fileName = "root.checkpoint"
}
checkpointFilePath := filepath.Join(checkpointDir, fileName)
return checkpointFilePath, nil
}
// GenerateProtocolSnapshotForCheckpointWithHeights does the same thing as GenerateProtocolSnapshotForCheckpoint
// except that it allows the caller to specify the end height of the sealed block that we iterate backwards from.
func GenerateProtocolSnapshotForCheckpointWithHeights(
logger zerolog.Logger,
state protocol.State,
headers storage.Headers,
seals storage.Seals,
checkpointDir string,
blocksToSkip uint,
endHeight uint64,
) (protocol.Snapshot, uint64, flow.StateCommitment, error) {
// Stop searching after 10,000 iterations or upon reaching the minimum height, whichever comes first.
startHeight := uint64(0)
// preventing startHeight from being negative
length := uint64(blocksToSkip+1) * 10000
if endHeight > length {
startHeight = endHeight - length
}
checkpointFilePath, err := findLatestCheckpointFilePath(checkpointDir)
if err != nil {
return nil, 0, flow.DummyStateCommitment, fmt.Errorf("could not find latest checkpoint file in directory %v: %w", checkpointDir, err)
}
log.Info().
Uint64("start_height", startHeight).
Uint64("end_height", endHeight).
Uint("blocksToSkip", blocksToSkip).
Msgf("generating protocol snapshot for checkpoint file %v", checkpointFilePath)
// find the height of the finalized block that produces the state commitment contained in the checkpoint file
sealedHeight, commit, finalizedHeight, err := FindHeightsByCheckpoints(logger, headers, seals, checkpointFilePath, blocksToSkip, startHeight, endHeight)
if err != nil {
return nil, 0, flow.DummyStateCommitment, fmt.Errorf("could not find sealed height in range [%v:%v] (blocksToSkip: %v) by checkpoints: %w",
startHeight, endHeight, blocksToSkip,
err)
}
snapshot := state.AtHeight(finalizedHeight)
return snapshot, sealedHeight, commit, nil
}
// hashesToCommits converts a list of ledger.RootHash to a list of flow.StateCommitment
func hashesToCommits(hashes []ledger.RootHash) []flow.StateCommitment {
commits := make([]flow.StateCommitment, len(hashes))
for i, h := range hashes {
commits[i] = flow.StateCommitment(h)
}
return commits
}