-
Notifications
You must be signed in to change notification settings - Fork 651
/
Copy pathtx_check.go
290 lines (254 loc) · 10 KB
/
tx_check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package bbolt
import (
"encoding/hex"
"fmt"
"go.etcd.io/bbolt/internal/common"
)
// Check performs several consistency checks on the database for this transaction.
// An error is returned if any inconsistency is found.
//
// It can be safely run concurrently on a writable transaction. However, this
// incurs a high cost for large databases and databases with a lot of subbuckets
// because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at
// the same time.
//
// It also allows users to provide a customized `KVStringer` implementation,
// so that bolt can generate human-readable diagnostic messages.
func (tx *Tx) Check(options ...CheckOption) <-chan error {
chkConfig := checkConfig{
kvStringer: HexKVStringer(),
}
for _, op := range options {
op(&chkConfig)
}
ch := make(chan error)
go func() {
// Close the channel to signal completion.
defer close(ch)
tx.check(chkConfig, ch)
}()
return ch
}
func (tx *Tx) check(cfg checkConfig, ch chan error) {
// Force loading free list if opened in ReadOnly mode.
tx.db.loadFreelist()
// Check if any pages are double freed.
freed := make(map[common.Pgid]bool)
all := make([]common.Pgid, tx.db.freelist.Count())
tx.db.freelist.Copyall(all)
for _, id := range all {
if freed[id] {
ch <- fmt.Errorf("page %d: already freed", id)
}
freed[id] = true
}
// Track every reachable page.
reachable := make(map[common.Pgid]*common.Page)
reachable[0] = tx.page(0) // meta0
reachable[1] = tx.page(1) // meta1
if tx.meta.Freelist() != common.PgidNoFreelist {
for i := uint32(0); i <= tx.page(tx.meta.Freelist()).Overflow(); i++ {
reachable[tx.meta.Freelist()+common.Pgid(i)] = tx.page(tx.meta.Freelist())
}
}
if cfg.pageId == 0 {
// Check the whole db file, starting from the root bucket and
// recursively check all child buckets.
tx.recursivelyCheckBucket(&tx.root, reachable, freed, cfg.kvStringer, ch)
// Ensure all pages below high water mark are either reachable or freed.
for i := common.Pgid(0); i < tx.meta.Pgid(); i++ {
_, isReachable := reachable[i]
if !isReachable && !freed[i] {
ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
}
}
} else {
// Check the db file starting from a specified pageId.
if cfg.pageId < 2 || cfg.pageId >= uint64(tx.meta.Pgid()) {
ch <- fmt.Errorf("page ID (%d) out of range [%d, %d)", cfg.pageId, 2, tx.meta.Pgid())
return
}
tx.recursivelyCheckPage(common.Pgid(cfg.pageId), reachable, freed, cfg.kvStringer, ch)
}
}
func (tx *Tx) recursivelyCheckPage(pageId common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
tx.checkInvariantProperties(pageId, reachable, freed, kvStringer, ch)
tx.recursivelyCheckBucketInPage(pageId, reachable, freed, kvStringer, ch)
}
func (tx *Tx) recursivelyCheckBucketInPage(pageId common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
p := tx.page(pageId)
switch {
case p.IsBranchPage():
for i := range p.BranchPageElements() {
elem := p.BranchPageElement(uint16(i))
tx.recursivelyCheckBucketInPage(elem.Pgid(), reachable, freed, kvStringer, ch)
}
case p.IsLeafPage():
for i := range p.LeafPageElements() {
elem := p.LeafPageElement(uint16(i))
if elem.IsBucketEntry() {
inBkt := common.NewInBucket(pageId, 0)
tmpBucket := Bucket{
InBucket: &inBkt,
rootNode: &node{isLeaf: p.IsLeafPage()},
FillPercent: DefaultFillPercent,
tx: tx,
}
if child := tmpBucket.Bucket(elem.Key()); child != nil {
tx.recursivelyCheckBucket(child, reachable, freed, kvStringer, ch)
}
}
}
default:
ch <- fmt.Errorf("unexpected page type (flags: %x) for pgId:%d", p.Flags(), pageId)
}
}
func (tx *Tx) recursivelyCheckBucket(b *Bucket, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
// Ignore inline buckets.
if b.RootPage() == 0 {
return
}
tx.checkInvariantProperties(b.RootPage(), reachable, freed, kvStringer, ch)
// Check each bucket within this bucket.
_ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil {
tx.recursivelyCheckBucket(child, reachable, freed, kvStringer, ch)
}
return nil
})
}
func (tx *Tx) checkInvariantProperties(pageId common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool,
kvStringer KVStringer, ch chan error) {
tx.forEachPage(pageId, func(p *common.Page, _ int, stack []common.Pgid) {
verifyPageReachable(p, tx.meta.Pgid(), stack, reachable, freed, ch)
})
tx.recursivelyCheckPageKeyOrder(pageId, kvStringer.KeyToString, ch)
}
func verifyPageReachable(p *common.Page, hwm common.Pgid, stack []common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool, ch chan error) {
if p.Id() > hwm {
ch <- fmt.Errorf("page %d: out of bounds: %d (stack: %v)", int(p.Id()), int(hwm), stack)
}
// Ensure each page is only referenced once.
for i := common.Pgid(0); i <= common.Pgid(p.Overflow()); i++ {
var id = p.Id() + i
if _, ok := reachable[id]; ok {
ch <- fmt.Errorf("page %d: multiple references (stack: %v)", int(id), stack)
}
reachable[id] = p
}
// We should only encounter un-freed leaf and branch pages.
if freed[p.Id()] {
ch <- fmt.Errorf("page %d: reachable freed", int(p.Id()))
} else if !p.IsBranchPage() && !p.IsLeafPage() {
ch <- fmt.Errorf("page %d: invalid type: %s (stack: %v)", int(p.Id()), p.Typ(), stack)
}
}
// recursivelyCheckPageKeyOrder verifies database consistency with respect to b-tree
// key order constraints:
// - keys on pages must be sorted
// - keys on children pages are between 2 consecutive keys on the parent's branch page).
func (tx *Tx) recursivelyCheckPageKeyOrder(pgId common.Pgid, keyToString func([]byte) string, ch chan error) {
tx.recursivelyCheckPageKeyOrderInternal(pgId, nil, nil, nil, keyToString, ch)
}
// recursivelyCheckPageKeyOrderInternal verifies that all keys in the subtree rooted at `pgid` are:
// - >=`minKeyClosed` (can be nil)
// - <`maxKeyOpen` (can be nil)
// - Are in right ordering relationship to their parents.
// `pagesStack` is expected to contain IDs of pages from the tree root to `pgid` for the clean debugging message.
func (tx *Tx) recursivelyCheckPageKeyOrderInternal(
pgId common.Pgid, minKeyClosed, maxKeyOpen []byte, pagesStack []common.Pgid,
keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) {
p := tx.page(pgId)
pagesStack = append(pagesStack, pgId)
switch {
case p.IsBranchPage():
// For branch page we navigate ranges of all subpages.
runningMin := minKeyClosed
for i := range p.BranchPageElements() {
elem := p.BranchPageElement(uint16(i))
verifyKeyOrder(elem.Pgid(), "branch", i, elem.Key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
maxKey := maxKeyOpen
if i < len(p.BranchPageElements())-1 {
maxKey = p.BranchPageElement(uint16(i + 1)).Key()
}
maxKeyInSubtree = tx.recursivelyCheckPageKeyOrderInternal(elem.Pgid(), elem.Key(), maxKey, pagesStack, keyToString, ch)
runningMin = maxKeyInSubtree
}
return maxKeyInSubtree
case p.IsLeafPage():
runningMin := minKeyClosed
for i := range p.LeafPageElements() {
elem := p.LeafPageElement(uint16(i))
verifyKeyOrder(pgId, "leaf", i, elem.Key(), runningMin, maxKeyOpen, ch, keyToString, pagesStack)
runningMin = elem.Key()
}
if p.Count() > 0 {
return p.LeafPageElement(p.Count() - 1).Key()
}
default:
ch <- fmt.Errorf("unexpected page type (flags: %x) for pgId:%d", p.Flags(), pgId)
}
return maxKeyInSubtree
}
/***
* verifyKeyOrder checks whether an entry with given #index on pgId (pageType: "branch|leaf") that has given "key",
* is within range determined by (previousKey..maxKeyOpen) and reports found violations to the channel (ch).
*/
func verifyKeyOrder(pgId common.Pgid, pageType string, index int, key []byte, previousKey []byte, maxKeyOpen []byte, ch chan error, keyToString func([]byte) string, pagesStack []common.Pgid) {
if index == 0 && previousKey != nil && compareKeys(previousKey, key) > 0 {
ch <- fmt.Errorf("the first key[%d]=(hex)%s on %s page(%d) needs to be >= the key in the ancestor (%s). Stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
if index > 0 {
cmpRet := compareKeys(previousKey, key)
if cmpRet > 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found <) than previous element (hex)%s. Stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
if cmpRet == 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be > (found =) than previous element (hex)%s. Stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
}
if maxKeyOpen != nil && compareKeys(key, maxKeyOpen) >= 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on %s page(%d) needs to be < than key of the next element in ancestor (hex)%s. Pages stack: %v",
index, keyToString(key), pageType, pgId, keyToString(previousKey), pagesStack)
}
}
// ===========================================================================================
type checkConfig struct {
kvStringer KVStringer
pageId uint64
}
type CheckOption func(options *checkConfig)
func WithKVStringer(kvStringer KVStringer) CheckOption {
return func(c *checkConfig) {
c.kvStringer = kvStringer
}
}
// WithPageId sets a page ID from which the check command starts to check
func WithPageId(pageId uint64) CheckOption {
return func(c *checkConfig) {
c.pageId = pageId
}
}
// KVStringer allows to prepare human-readable diagnostic messages.
type KVStringer interface {
KeyToString([]byte) string
ValueToString([]byte) string
}
// HexKVStringer serializes both key & value to hex representation.
func HexKVStringer() KVStringer {
return hexKvStringer{}
}
type hexKvStringer struct{}
func (_ hexKvStringer) KeyToString(key []byte) string {
return hex.EncodeToString(key)
}
func (_ hexKvStringer) ValueToString(value []byte) string {
return hex.EncodeToString(value)
}