-
Notifications
You must be signed in to change notification settings - Fork 28
/
pindex_bleve_rollback.go
131 lines (106 loc) · 3.53 KB
/
pindex_bleve_rollback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2016-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbft
import (
"fmt"
"os"
"sync/atomic"
"github.com/blevesearch/bleve/v2/index/scorch"
"github.com/couchbase/cbgt"
log "github.com/couchbase/clog"
)
func init() {
cbgt.RollbackHook = func(phase cbgt.RollbackPhase, pindex *cbgt.PIndex) (err error) {
df, ok := pindex.Dest.(*cbgt.DestForwarder)
if !ok || df == nil {
return fmt.Errorf("rollbackhook: invalid dest for pindex:%s",
pindex.Name)
}
bd, ok := df.DestProvider.(*BleveDest)
if !ok || bd == nil {
return fmt.Errorf("rollbackhook: invalid destProvider for pindex:%s",
pindex.Name)
}
bd.m.Lock()
defer bd.m.Unlock()
switch phase {
case cbgt.RollbackInit:
pindexName := bd.bindex.Name()
wasClosed, wasPartial, err := bd.partialRollbackLOCKED()
log.Printf("pindex_bleve_rollback: path: %s, wasClosed: %t, "+
"wasPartial: %t, err: %v", bd.path, wasClosed, wasPartial, err)
if !wasClosed {
bd.closeLOCKED(false)
}
if !wasPartial {
atomic.AddUint64(&TotRollbackFull, 1)
if ServerlessMode {
// this is a full rollback, so the paritition is going to be
// rebuilt a fresh. The reason we are refunding over here is
// because this is not a end-user problem, but rather a
// couchbase cluster problem. So, once the partition is built
// afresh, we would essentially any loss of cost by charging
// for 0 - original high seq no. and after that we will
// actually start costing the user.
RollbackRefund(pindexName, bd.sourceName, 0)
}
os.RemoveAll(bd.path) // Full rollback to zero.
} else {
atomic.AddUint64(&TotRollbackPartial, 1)
}
case cbgt.RollbackCompleted:
bd.isRollbackInProgress = false
}
return nil
}
}
func (t *BleveDest) Rollback(partition string, vBucketUUID uint64, rollbackSeq uint64) error {
t.AddError("dest rollback", partition, nil, rollbackSeq, nil, nil)
// NOTE: A rollback of any partition means a rollback of all the
// partitions in the bindex, so lock the entire BleveDest.
t.m.Lock()
defer t.m.Unlock()
// The BleveDest may be closed due to another partition(BleveDestPartition) of
// the same pindex being rolled back earlier.
if t.bindex == nil || t.isRollbackInProgress {
return nil
}
t.isRollbackInProgress = true
t.rollbackInfo = &RollbackInfo{
partition: partition,
vBucketUUID: vBucketUUID,
rollbackSeq: rollbackSeq,
}
// Whether partial or full rollback, restart the BleveDest so that
// feeds are restarted.
t.rollback()
return nil
}
// Attempt partial rollback.
func (t *BleveDest) partialRollbackLOCKED() (bool, bool, error) {
if t.bindex == nil {
return false, false, nil
}
index, err := t.bindex.Advanced()
if err != nil {
return false, false, err
}
if sh, ok := index.(*scorch.Scorch); ok {
return t.partialScorchRollbackLOCKED(sh)
}
return false, false, fmt.Errorf("unknown index type")
}
func (t *BleveDestPartition) Rollback(partition string,
rollbackSeq uint64) error {
// placeholder implementation
return t.bdest.Rollback(partition, 0, rollbackSeq)
}
func (t *BleveDestPartition) RollbackEx(partition string,
vBucketUUID uint64, rollbackSeq uint64) error {
return t.bdest.Rollback(partition, vBucketUUID, rollbackSeq)
}