Skip to content

Commit

Permalink
MB-61421: Prevent deletion of segments scheduled for copy (#2032)
Browse files Browse the repository at this point in the history
- Use a modified index reader, CopyReader, to mark segments in the
Scorch root for online copy/backup operations. This prevents their
deletion by the asynchronous cleanup routine during the copy/backup
process, thereby mitigating the race condition between the
merger/persistor and the copy/backup routine.

---------

Co-authored-by: Abhinav Dangeti <[email protected]>
  • Loading branch information
CascadingRadium and abhinavdangeti authored May 14, 2024
1 parent 327c1f3 commit 466f095
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 26 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ module github.com/blevesearch/bleve/v2
go 1.21

require (
github.com/RoaringBitmap/roaring v1.9.1
github.com/RoaringBitmap/roaring v1.9.3
github.com/bits-and-blooms/bitset v1.12.0
github.com/blevesearch/bleve_index_api v1.1.6
github.com/blevesearch/bleve_index_api v1.1.7
github.com/blevesearch/geo v0.1.20
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/goleveldb v1.0.1
github.com/blevesearch/gtreap v0.1.1
github.com/blevesearch/scorch_segment_api/v2 v2.2.11
github.com/blevesearch/scorch_segment_api/v2 v2.2.12
github.com/blevesearch/segment v0.9.1
github.com/blevesearch/snowball v0.6.1
github.com/blevesearch/snowballstem v0.9.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/blevesearch/zapx/v13 v13.3.10
github.com/blevesearch/zapx/v14 v14.3.10
github.com/blevesearch/zapx/v15 v15.3.13
github.com/blevesearch/zapx/v16 v16.0.13-0.20240430155854-6fe4e6b9f70a
github.com/blevesearch/zapx/v16 v16.0.13-0.20240514143026-a9fced173441
github.com/couchbase/moss v0.2.0
github.com/golang/protobuf v1.3.2
github.com/spf13/cobra v1.7.0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
github.com/RoaringBitmap/roaring v1.9.1 h1:LXcSqGGGMKm+KAzUyWn7ZeREqoOkoMX+KwLOK1thc4I=
github.com/RoaringBitmap/roaring v1.9.1/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM=
github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blevesearch/bleve_index_api v1.1.6 h1:orkqDFCBuNU2oHW9hN2YEJmet+TE9orml3FCGbl1cKk=
github.com/blevesearch/bleve_index_api v1.1.6/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8=
github.com/blevesearch/bleve_index_api v1.1.7 h1:KvvKVn/9+dKCEJHL6flDH3NCiXeKnK3P85e4EOgVjdM=
github.com/blevesearch/bleve_index_api v1.1.7/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8=
github.com/blevesearch/geo v0.1.20 h1:paaSpu2Ewh/tn5DKn/FB5SzvH0EWupxHEIwbCk/QPqM=
github.com/blevesearch/geo v0.1.20/go.mod h1:DVG2QjwHNMFmjo+ZgzrIq2sfCh6rIHzy9d9d0B59I6w=
github.com/blevesearch/go-faiss v1.0.15 h1:aBrj6fwLuY8CkhECFbvkc4qhLTkrYI84QoEaw9z1jMI=
Expand All @@ -19,8 +19,8 @@ github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgY
github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.2.11 h1:nb5KTeIhDUu+Ka6He7xXvOXcJOt9Db7c3Vy2ptqJQRs=
github.com/blevesearch/scorch_segment_api/v2 v2.2.11/go.mod h1:QVakeAECt+Fxe+zu0A4V1bgPdqNeC93wQvzaXDF3NPo=
github.com/blevesearch/scorch_segment_api/v2 v2.2.12 h1:LuBJ4AfKlMsyi/8xFhxIArEnrlAcI3qGTvzwCTdfKgA=
github.com/blevesearch/scorch_segment_api/v2 v2.2.12/go.mod h1:80vK+flQwZlyj4UralqpicXVQWFgkKTzdqbSvOGHhWs=
github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU=
github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw=
github.com/blevesearch/snowball v0.6.1 h1:cDYjn/NCH+wwt2UdehaLpr2e4BwLIjN4V/TdLsL+B5A=
Expand All @@ -43,8 +43,8 @@ github.com/blevesearch/zapx/v14 v14.3.10 h1:SG6xlsL+W6YjhX5N3aEiL/2tcWh3DO75Bnz7
github.com/blevesearch/zapx/v14 v14.3.10/go.mod h1:qqyuR0u230jN1yMmE4FIAuCxmahRQEOehF78m6oTgns=
github.com/blevesearch/zapx/v15 v15.3.13 h1:6EkfaZiPlAxqXz0neniq35my6S48QI94W/wyhnpDHHQ=
github.com/blevesearch/zapx/v15 v15.3.13/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg=
github.com/blevesearch/zapx/v16 v16.0.13-0.20240430155854-6fe4e6b9f70a h1:nSNmMQt2X6GJL2p9t5UW1/FMnAKsZAwNOvWRZZkYTj0=
github.com/blevesearch/zapx/v16 v16.0.13-0.20240430155854-6fe4e6b9f70a/go.mod h1:B5dln0aZM/CPMZQP+HlM0UGnZFvVVGmnR3nvKuoOZKU=
github.com/blevesearch/zapx/v16 v16.0.13-0.20240514143026-a9fced173441 h1:SJhTyViAfEQBT6icz7omIHgnVVNqscpUAlfINzRmNn4=
github.com/blevesearch/zapx/v16 v16.0.13-0.20240514143026-a9fced173441/go.mod h1:Viwste/2ooJPzsLNuncnvyTkbAL2iAgM4ge5agqkdPg=
github.com/couchbase/ghistogram v0.1.0 h1:b95QcQTCzjTUocDXp/uMgSNQi8oj1tGwnJ4bODWZnps=
github.com/couchbase/ghistogram v0.1.0/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k=
github.com/couchbase/moss v0.2.0 h1:VCYrMzFwEryyhRSeI+/b3tRBSeTpi/8gn5Kf6dxqn+o=
Expand Down
11 changes: 7 additions & 4 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,11 +549,14 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
val := make([]byte, 8)
bytesWritten := atomic.LoadUint64(&snapshot.parent.stats.TotBytesWrittenAtIndexTime)
binary.LittleEndian.PutUint64(val, bytesWritten)
internalBucket.Put(TotBytesWrittenKey, val)
err = internalBucket.Put(TotBytesWrittenKey, val)
if err != nil {
return nil, nil, err
}
}

var filenames []string
newSegmentPaths := make(map[uint64]string)
filenames := make([]string, 0, len(snapshot.segment))
newSegmentPaths := make(map[uint64]string, len(snapshot.segment))

// first ensure that each segment in this snapshot has been persisted
for _, segmentSnapshot := range snapshot.segment {
Expand Down Expand Up @@ -1133,7 +1136,7 @@ func (s *Scorch) removeOldZapFiles() error {
for _, f := range files {
fname := f.Name()
if filepath.Ext(fname) == ".zap" {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] && (s.copyScheduled[fname] <= 0) {
err := os.Remove(s.path + string(os.PathSeparator) + fname)
if err != nil {
log.Printf("got err removing file: %s, err: %v", fname, err)
Expand Down
40 changes: 40 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -49,13 +50,20 @@ type Scorch struct {
unsafeBatch bool

rootLock sync.RWMutex

root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
persistedCallbacks []index.BatchCallback
nextSnapshotEpoch uint64
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.

// keeps track of segments scheduled for online copy/backup operation. Each segment's filename maps to
// the count of copy schedules. Segments with non-zero counts are protected from removal by the cleanup
// operation. Counts decrement upon successful copy, allowing removal of segments with zero or absent counts.
// must be accessed within the rootLock as it is accessed by the asynchronous cleanup routine.
copyScheduled map[string]int

numSnapshotsToKeep int
rollbackRetentionFactor float64
checkPoints []*snapshotMetaData
Expand Down Expand Up @@ -112,6 +120,7 @@ func NewScorch(storeName string,
ineligibleForRemoval: map[string]bool{},
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,
copyScheduled: map[string]int{},
}

forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
Expand Down Expand Up @@ -836,3 +845,34 @@ func newFieldStats() *fieldStats {
}
return rv
}

// CopyReader returns a low-level accessor for index data, ensuring persisted segments
// remain on disk for backup, preventing race conditions with the persister/merger cleanup.
// Close the reader after backup to allow segment removal by the persister/merger.
func (s *Scorch) CopyReader() index.CopyReader {
s.rootLock.Lock()
rv := s.root
if rv != nil {
rv.AddRef()
var fileName string
// schedule a backup for all the segments from the root. Note that the
// both the unpersisted and persisted segments are scheduled for backup.
// because during the backup, the unpersisted segments may get persisted and
// hence we need to protect both the unpersisted and persisted segments from removal
// by the cleanup routine during the online backup
for _, seg := range rv.segment {
if perSeg, ok := seg.segment.(segment.PersistedSegment); ok {
// segment is persisted
fileName = filepath.Base(perSeg.Path())
} else {
// segment is not persisted
// the name of the segment file that is generated if the
// the segment is persisted in the future.
fileName = zapFileName(seg.id)
}
rv.parent.copyScheduled[fileName]++
}
}
s.rootLock.Unlock()
return rv
}
23 changes: 23 additions & 0 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,3 +905,26 @@ func (is *IndexSnapshot) GetSpatialAnalyzerPlugin(typ string) (
}
return rv, nil
}

func (is *IndexSnapshot) CloseCopyReader() error {
// first unmark the segments that were marked for backup by this index snapshot
is.parent.rootLock.Lock()
for _, seg := range is.segment {
var fileName string
if perSeg, ok := seg.segment.(segment.PersistedSegment); ok {
// segment is persisted
fileName = filepath.Base(perSeg.Path())
} else {
// segment is not persisted
// the name of the segment file that is generated if the
// the segment is persisted in the future.
fileName = zapFileName(seg.id)
}
if is.parent.copyScheduled[fileName]--; is.parent.copyScheduled[fileName] <= 0 {
delete(is.parent.copyScheduled, fileName)
}
}
is.parent.rootLock.Unlock()
// close the index snapshot normally
return is.Close()
}
21 changes: 11 additions & 10 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,22 +1074,23 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) {
return ErrorIndexClosed
}

indexReader, err := i.i.Reader()
if err != nil {
return err
copyIndex, ok := i.i.(index.CopyIndex)
if !ok {
return fmt.Errorf("index implementation does not support copy reader")
}

copyReader := copyIndex.CopyReader()
if copyReader == nil {
return fmt.Errorf("index's copyReader is nil")
}

defer func() {
if cerr := indexReader.Close(); err == nil && cerr != nil {
if cerr := copyReader.CloseCopyReader(); err == nil && cerr != nil {
err = cerr
}
}()

irc, ok := indexReader.(IndexCopyable)
if !ok {
return fmt.Errorf("index implementation does not support copy")
}

err = irc.CopyTo(d)
err = copyReader.CopyTo(d)
if err != nil {
return fmt.Errorf("error copying index metadata: %v", err)
}
Expand Down

0 comments on commit 466f095

Please sign in to comment.