Skip to content

Commit

Permalink
fix(dgraph): Fix out of order issues with split keys in bulk loader. (#…
Browse files Browse the repository at this point in the history
…6125)

Split keys for indexes can cause out-of-order issues due to the variable length of the term inside the key.
To fix the issue, the split keys are written to a temporary DB first (using the writebatch to avoid the out
of order issues) and then copied to the main p directory.

Related to DGRAPH-1897

(cherry picked from commit 2a3b85c)
  • Loading branch information
martinmr authored Aug 3, 2020
1 parent 9a9ec36 commit 8bdc648
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 19 deletions.
10 changes: 6 additions & 4 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ type current struct {

type countIndexer struct {
*reducer
writer *badger.StreamWriter
cur current
counts map[int][]uint64
wg sync.WaitGroup
writer *badger.StreamWriter
splitWriter *badger.WriteBatch
tmpDb *badger.DB
cur current
counts map[int][]uint64
wg sync.WaitGroup
}

// addUid adds the uid from rawKey to a count index if a count index is
Expand Down
6 changes: 5 additions & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type state struct {
readerChunkCh chan *bytes.Buffer
mapFileId uint32 // Used atomically to name the output files of the mappers.
dbs []*badger.DB
writeTs uint64 // All badger writes use this timestamp
tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues.
writeTs uint64 // All badger writes use this timestamp
}

type loader struct {
Expand Down Expand Up @@ -331,5 +332,8 @@ func (ld *loader) cleanup() {
for _, db := range ld.dbs {
x.Check(db.Close())
}
for _, db := range ld.tmpDbs {
x.Check(db.Close())
}
ld.prog.endSummary()
}
140 changes: 126 additions & 14 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -52,6 +54,7 @@ type reducer struct {
}

const batchSize = 10000
const maxSplitBatchLen = 1000
const batchAlloc = batchSize * 11 / 10

func (r *reducer) run() error {
Expand All @@ -64,7 +67,7 @@ func (r *reducer) run() error {
if err := thr.Do(); err != nil {
return err
}
go func(shardId int, db *badger.DB) {
go func(shardId int, db *badger.DB, tmpDb *badger.DB) {
defer thr.Done(nil)

mapFiles := filenamesInTree(dirs[shardId])
Expand All @@ -78,8 +81,15 @@ func (r *reducer) run() error {

writer := db.NewStreamWriter()
x.Check(writer.Prepare())

ci := &countIndexer{reducer: r, writer: writer}
// Split lists are written to a separate DB first to avoid ordering issues.
splitWriter := tmpDb.NewManagedWriteBatch()

ci := &countIndexer{
reducer: r,
writer: writer,
splitWriter: splitWriter,
tmpDb: tmpDb,
}
sort.Slice(partitionKeys, func(i, j int) bool {
return bytes.Compare(partitionKeys[i], partitionKeys[j]) < 0
})
Expand All @@ -92,17 +102,21 @@ func (r *reducer) run() error {
ci.wait()

x.Check(writer.Flush())

// Write split lists back to the main DB.
r.writeSplitLists(db, tmpDb)

for _, itr := range mapItrs {
if err := itr.Close(); err != nil {
fmt.Printf("Error while closing iterator: %v", err)
}
}
}(i, r.createBadger(i))
}(i, r.createBadger(i), r.createTmpBadger())
}
return thr.Finish()
}

func (r *reducer) createBadger(i int) *badger.DB {
func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB {
if r.opt.BadgerKeyFile != "" {
// Need to set zero addr in WorkerConfig before checking the license.
x.WorkerConfig.ZeroAddr = []string{r.opt.ZeroAddr}
Expand All @@ -115,25 +129,43 @@ func (r *reducer) createBadger(i int) *badger.DB {
}
}

opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false).
opt := badger.DefaultOptions(dir).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).WithMaxCacheSize(1 << 20).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile)).WithCompression(bo.None)

// Overwrite badger options based on the options provided by the user.
r.setBadgerOptions(&opt)
r.setBadgerOptions(&opt, compression)

db, err := badger.OpenManaged(opt)
x.Check(err)

// Zero out the key from memory.
opt.EncryptionKey = nil
return db
}

func (r *reducer) createBadger(i int) *badger.DB {
db := r.createBadgerInternal(r.opt.shardOutputDirs[i], true)
r.dbs = append(r.dbs, db)
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options) {
func (r *reducer) createTmpBadger() *badger.DB {
tmpDir, err := ioutil.TempDir(r.opt.TmpDir, "split")
x.Check(err)
// Do not enable compression in temporary badger to improve performance.
db := r.createBadgerInternal(tmpDir, false)
r.tmpDbs = append(r.tmpDbs, db)
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options, compression bool) {
if !compression {
opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
return
}
// Set the compression level.
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
if r.state.opt.BadgerCompressionLevel < 1 {
Expand Down Expand Up @@ -282,6 +314,7 @@ type encodeRequest struct {
countKeys []*countIndexEntry
wg *sync.WaitGroup
list *bpb.KVList
splitList *bpb.KVList
}

func (r *reducer) streamIdFor(pred string) uint32 {
Expand All @@ -306,24 +339,60 @@ func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
for req := range entryCh {

req.list = &bpb.KVList{}
countKeys := r.toList(req.entries, req.list)
req.splitList = &bpb.KVList{}
countKeys := r.toList(req.entries, req.list, req.splitList)
for _, kv := range req.list.Kv {
pk, err := x.Parse(kv.Key)
x.Check(err)
x.AssertTrue(len(pk.Attr) > 0)
kv.StreamId = r.streamIdFor(pk.Attr)
if pk.HasStartUid {
kv.StreamId |= 0x80000000
}
}
req.countKeys = countKeys
req.wg.Done()
atomic.AddInt32(&r.prog.numEncoding, -1)
}
}

func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *sync.WaitGroup) {
defer wg.Done()
splitBatchLen := 0

for kvs := range kvsCh {
if kvs == nil || len(kvs.Kv) == 0 {
continue
}

for i := 0; i < len(kvs.Kv); i += maxSplitBatchLen {
// Flush the write batch when the max batch length is reached to prevent the
// value log from growing over the allowed limit.
if splitBatchLen >= maxSplitBatchLen {
x.Check(ci.splitWriter.Flush())
ci.splitWriter = ci.tmpDb.NewManagedWriteBatch()
splitBatchLen = 0
}

batch := &bpb.KVList{}
if i+maxSplitBatchLen >= len(kvs.Kv) {
batch.Kv = kvs.Kv[i:]
} else {
batch.Kv = kvs.Kv[i : i+maxSplitBatchLen]
}
splitBatchLen += len(batch.Kv)
x.Check(ci.splitWriter.Write(batch))
}
}
x.Check(ci.splitWriter.Flush())
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) {
defer closer.Done()

// Concurrently write split lists to a temporary badger.
tmpWg := new(sync.WaitGroup)
tmpWg.Add(1)
splitCh := make(chan *bpb.KVList, 2*runtime.NumCPU())
go r.writeTmpSplits(ci, splitCh, tmpWg)

for req := range writerCh {
req.wg.Wait()

Expand All @@ -334,11 +403,53 @@ func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, c
start := time.Now()

x.Check(ci.writer.Write(req.list))
if req.splitList != nil && len(req.splitList.Kv) > 0 {
splitCh <- req.splitList
}

if dur := time.Since(start).Round(time.Millisecond); dur > time.Second {
fmt.Printf("writeCh: Time taken to write req: %v\n",
time.Since(start).Round(time.Millisecond))
}
}

// Wait for split lists to be written to the temporary badger.
close(splitCh)
tmpWg.Wait()
}

func (r *reducer) writeSplitLists(db, tmpDb *badger.DB) {
txn := tmpDb.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

writer := db.NewManagedWriteBatch()
splitBatchLen := 0

for itr.Rewind(); itr.Valid(); itr.Next() {
// Flush the write batch when the max batch length is reached to prevent the
// value log from growing over the allowed limit.
if splitBatchLen >= maxSplitBatchLen {
x.Check(writer.Flush())
writer = db.NewManagedWriteBatch()
splitBatchLen = 0
}
item := itr.Item()

valCopy, err := item.ValueCopy(nil)
x.Check(err)
kv := &bpb.KV{
Key: item.KeyCopy(nil),
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}
x.Check(writer.Write(&bpb.KVList{Kv: []*bpb.KV{kv}}))
splitBatchLen += 1
}
x.Check(writer.Flush())
}

func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) {
Expand Down Expand Up @@ -395,7 +506,7 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
writerCloser.SignalAndWait()
}

func (r *reducer) toList(bufEntries [][]byte, list *bpb.KVList) []*countIndexEntry {
func (r *reducer) toList(bufEntries [][]byte, list, splitList *bpb.KVList) []*countIndexEntry {
sort.Slice(bufEntries, func(i, j int) bool {
lh, err := GetKeyForMapEntry(bufEntries[i])
x.Check(err)
Expand Down Expand Up @@ -479,7 +590,8 @@ func (r *reducer) toList(bufEntries [][]byte, list *bpb.KVList) []*countIndexEnt
l := posting.NewList(y.Copy(currentKey), pl, writeVersionTs)
kvs, err := l.Rollup()
x.Check(err)
list.Kv = append(list.Kv, kvs...)
list.Kv = append(list.Kv, kvs[0])
splitList.Kv = append(splitList.Kv, kvs[1:]...)
} else {
val, err := pl.Marshal()
x.Check(err)
Expand Down

0 comments on commit 8bdc648

Please sign in to comment.