From cd7a83382f7edf454ab77f4700db1008fd759e28 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 6 Jun 2019 18:11:42 -0700 Subject: [PATCH 01/10] Simplified shuffler and reducer code. But, encountered an issue where key sorted order changes due to version append in Badger. --- dgraph/cmd/bulk/loader.go | 16 +--- dgraph/cmd/bulk/reduce.go | 81 +++++++++---------- dgraph/cmd/bulk/shuffle.go | 46 ++++++++--- protos/pb.proto | 2 + .../dgraph-io/badger/stream_writer.go | 23 ++++-- 5 files changed, 95 insertions(+), 73 deletions(-) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 440f19837ed..f445f465199 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -220,25 +220,15 @@ func (ld *loader) mapStage() { } type shuffleOutput struct { - db *badger.DB + writer *badger.StreamWriter mapEntries []*pb.MapEntry } func (ld *loader) reduceStage() { ld.prog.setPhase(reducePhase) - shuffleOutputCh := make(chan shuffleOutput, 100) - go func() { - shuf := shuffler{state: ld.state, output: shuffleOutputCh} - shuf.run() - }() - - redu := reducer{ - state: ld.state, - input: shuffleOutputCh, - writesThr: x.NewThrottle(100), - } - redu.run() + shuf := shuffler{state: ld.state} + shuf.run() } func (ld *loader) writeSchema() { diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 304554316af..53752fc7729 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -18,9 +18,10 @@ package bulk import ( "bytes" - "sync/atomic" + "fmt" - "github.com/dgraph-io/badger" + bpb "github.com/dgraph-io/badger/pb" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" @@ -28,62 +29,46 @@ import ( "github.com/dgraph-io/dgraph/x" ) -type reducer struct { - *state - input <-chan shuffleOutput - writesThr *x.Throttle -} - -func (r *reducer) run() { - thr := x.NewThrottle(r.opt.NumGoroutines) - for reduceJob := range r.input { - thr.Start() - NumReducers.Add(1) - NumQueuedReduceJobs.Add(-1) - r.writesThr.Start() - go func(job shuffleOutput) { - r.reduce(job) - thr.Done() - NumReducers.Add(-1) - }(reduceJob) - } - thr.Wait() - r.writesThr.Wait() -} - -func (r *reducer) reduce(job shuffleOutput) { +func reduce(mapEntries []*pb.MapEntry) *bpb.KVList { var currentKey []byte var uids []uint64 pl := new(pb.PostingList) - txn := job.db.NewTransactionAt(r.state.writeTs, true) - outputPostingList := func() { - atomic.AddInt64(&r.prog.reduceKeyCount, 1) + list := &bpb.KVList{} + appendToList := func() { + // TODO: Bring this back. + // atomic.AddInt64(&r.prog.reduceKeyCount, 1) // For a UID-only posting list, the badger value is a delta packed UID // list. The UserMeta indicates to treat the value as a delta packed // list when the value is read by dgraph. For a value posting list, // the full pb.Posting type is used (which pb.y contains the // delta packed UID list). - meta := posting.BitCompletePosting + if len(uids) == 0 { + return + } pl.Pack = codec.Encode(uids, 256) val, err := pl.Marshal() x.Check(err) - x.Check(txn.SetEntry(&badger.Entry{ - Key: currentKey, + list.Kv = append(list.Kv, &bpb.KV{ + Key: y.Copy(currentKey), Value: val, - UserMeta: meta, - })) + UserMeta: []byte{posting.BitCompletePosting}, + Version: 1, + }) + pk := x.Parse(currentKey) + fmt.Printf("append pk: %+v\n", pk) uids = uids[:0] pl.Reset() } - for _, mapEntry := range job.mapEntries { - atomic.AddInt64(&r.prog.reduceEdgeCount, 1) + for _, mapEntry := range mapEntries { + // TODO: Bring this back. + // atomic.AddInt64(&r.prog.reduceEdgeCount, 1) if !bytes.Equal(mapEntry.Key, currentKey) && currentKey != nil { - outputPostingList() + appendToList() } currentKey = mapEntry.Key @@ -99,12 +84,20 @@ func (r *reducer) reduce(job shuffleOutput) { pl.Postings = append(pl.Postings, mapEntry.Posting) } } - outputPostingList() + appendToList() + return list - NumBadgerWrites.Add(1) - x.Check(txn.CommitAt(r.state.writeTs, func(err error) { - x.Check(err) - NumBadgerWrites.Add(-1) - r.writesThr.Done() - })) + // NumBadgerWrites.Add(1) + + // TODO: Bring this back. + // for _, kv := range list.Kv { + // pk := x.Parse(kv.Key) + // fmt.Printf("pk: %+v\n", pk) + // } + // x.Check(job.writer.Write(list)) + + // x.Check(txn.CommitAt(r.state.writeTs, func(err error) { + // x.Check(err) + // NumBadgerWrites.Add(-1) + // })) } diff --git a/dgraph/cmd/bulk/shuffle.go b/dgraph/cmd/bulk/shuffle.go index 0d13c1acd4b..8d1db70f89f 100644 --- a/dgraph/cmd/bulk/shuffle.go +++ b/dgraph/cmd/bulk/shuffle.go @@ -21,12 +21,14 @@ import ( "bytes" "container/heap" "encoding/binary" + "fmt" "io" "log" "os" "github.com/dgraph-io/badger" bo "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/gogo/protobuf/proto" @@ -34,7 +36,6 @@ import ( type shuffler struct { *state - output chan<- shuffleOutput } func (s *shuffler) run() { @@ -60,7 +61,6 @@ func (s *shuffler) run() { }(i, s.createBadger(i)) } thr.Wait() - close(s.output) } func (s *shuffler) createBadger(i int) *badger.DB { @@ -113,10 +113,22 @@ func (s *shuffler) shufflePostings(mapEntryChs []chan *pb.MapEntry, ci *countInd heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) } + db := ci.db + writer := db.NewStreamWriter() + if err := writer.Prepare(); err != nil { + panic(err) + } + defer func() { + if err := writer.Flush(); err != nil { + panic(err) + } + }() + const batchSize = 1000 const batchAlloc = batchSize * 11 / 10 batch := make([]*pb.MapEntry, 0, batchAlloc) var prevKey []byte + var slast []byte var plistLen int for len(ph.nodes) > 0 { me := ph.nodes[0].mapEntry @@ -127,15 +139,26 @@ func (s *shuffler) shufflePostings(mapEntryChs []chan *pb.MapEntry, ci *countInd } else { heap.Pop(&ph) } - - keyChanged := !bytes.Equal(prevKey, me.Key) - if keyChanged && plistLen > 0 { - ci.addUid(prevKey, plistLen) - plistLen = 0 + if bytes.Compare(me.Key, prevKey) < 0 { + panic("what") } - if len(batch) >= batchSize && !bytes.Equal(prevKey, me.Key) { - s.output <- shuffleOutput{mapEntries: batch, db: ci.db} + keyChanged := !bytes.Equal(prevKey, me.Key) + // TODO: Bring this back. + // if keyChanged && plistLen > 0 { + // ci.addUid(prevKey, plistLen) + // plistLen = 0 + // } + + if len(batch) >= batchSize && keyChanged { + list := reduce(batch) + for _, kv := range list.Kv { + if bytes.Compare(kv.Key, slast) <= 0 { + panic("wrong") + } + slast = y.Copy(kv.Key) + } + x.Check(writer.Write(list)) NumQueuedReduceJobs.Add(1) batch = make([]*pb.MapEntry, 0, batchAlloc) } @@ -143,13 +166,16 @@ func (s *shuffler) shufflePostings(mapEntryChs []chan *pb.MapEntry, ci *countInd batch = append(batch, me) plistLen++ } + fmt.Println("Done with ph.nodes") if len(batch) > 0 { - s.output <- shuffleOutput{mapEntries: batch, db: ci.db} + // reduce(shuffleOutput{mapEntries: batch, writer: writer}) + // x.Check(job.writer.Write(list)) NumQueuedReduceJobs.Add(1) } if plistLen > 0 { ci.addUid(prevKey, plistLen) } + fmt.Println("Done shuffling.") } type heapNode struct { diff --git a/protos/pb.proto b/protos/pb.proto index 90193cb319a..104d3d042aa 100644 --- a/protos/pb.proto +++ b/protos/pb.proto @@ -291,6 +291,8 @@ message PostingList { uint64 commit_ts = 3; // More inclination towards smaller values. repeated uint64 splits = 4; + + repeated uint64 longterm_uids = 21; } message FacetParam { diff --git a/vendor/github.com/dgraph-io/badger/stream_writer.go b/vendor/github.com/dgraph-io/badger/stream_writer.go index 6968e3c452c..fc0ed93df50 100644 --- a/vendor/github.com/dgraph-io/badger/stream_writer.go +++ b/vendor/github.com/dgraph-io/badger/stream_writer.go @@ -18,11 +18,14 @@ package badger import ( "bytes" + "encoding/hex" + "fmt" "math" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/y" + "github.com/dgraph-io/dgraph/x" humanize "github.com/dustin/go-humanize" "github.com/pkg/errors" ) @@ -257,18 +260,26 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) { // Add adds key and vs to sortedWriter. func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { if bytes.Compare(key, w.lastKey) <= 0 { - return ErrUnsortedKey - } - sameKey := y.SameKey(key, w.lastKey) - w.lastKey = y.SafeCopy(w.lastKey, key) + if bytes.Equal(key, w.lastKey) { + fmt.Println("EQUAL") + } + pk1 := x.Parse(key) + pk2 := x.Parse(w.lastKey) + fmt.Printf("Cur: %+v\n", pk1) + fmt.Printf("Lastkey: %+v\n", pk2) - if err := w.builder.Add(key, vs); err != nil { - return err + return errors.Wrapf(ErrUnsortedKey, "key: %s lastKey: %s", hex.Dump(key), hex.Dump(w.lastKey)) } + sameKey := y.SameKey(key, w.lastKey) // Same keys should go into the same SSTable. if !sameKey && w.builder.ReachedCapacity(w.db.opt.MaxTableSize) { return w.send() } + + w.lastKey = y.SafeCopy(w.lastKey, key) + if err := w.builder.Add(key, vs); err != nil { + return err + } return nil } From fe8acd203974db94c08396076456c0d50028d6e9 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 13:57:00 -0700 Subject: [PATCH 02/10] Working code after StreamWriter integration. --- dgraph/cmd/bulk/count_index.go | 36 ++- dgraph/cmd/bulk/loader.go | 6 +- dgraph/cmd/bulk/metrics.go | 25 -- dgraph/cmd/bulk/reduce.go | 224 ++++++++++++++++-- dgraph/cmd/bulk/run.go | 10 +- dgraph/cmd/bulk/shuffle.go | 206 ---------------- .../dgraph-io/badger/stream_writer.go | 2 +- vendor/github.com/dgraph-io/badger/y/y.go | 3 +- 8 files changed, 247 insertions(+), 265 deletions(-) delete mode 100644 dgraph/cmd/bulk/metrics.go delete mode 100644 dgraph/cmd/bulk/shuffle.go diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index 640fb73d0c1..ac9e799bd26 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -17,10 +17,14 @@ package bulk import ( + "bytes" + "fmt" "sort" "sync" + "sync/atomic" "github.com/dgraph-io/badger" + bpb "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" @@ -35,10 +39,11 @@ type current struct { type countIndexer struct { *state - db *badger.DB - cur current - counts map[int][]uint64 - wg sync.WaitGroup + writer *badger.StreamWriter + cur current + counts map[int][]uint64 + streamId uint32 + wg sync.WaitGroup } // addUid adds the uid from rawKey to a count index if a count index is @@ -72,8 +77,10 @@ func (c *countIndexer) addUid(rawKey []byte, count int) { } func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64) { - writer := posting.NewTxnWriter(c.db) + defer c.wg.Done() + streamId := atomic.AddUint32(&c.streamId, 1) + list := &bpb.KVList{} for count, uids := range counts { sort.Slice(uids, func(i, j int) bool { return uids[i] < uids[j] }) @@ -81,12 +88,21 @@ func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64 pl.Pack = codec.Encode(uids, 256) data, err := pl.Marshal() x.Check(err) - x.Check(writer.SetAt( - x.CountKey(pred, uint32(count), rev), - data, posting.BitCompletePosting, c.state.writeTs)) + list.Kv = append(list.Kv, &bpb.KV{ + Key: x.CountKey(pred, uint32(count), rev), + Value: data, + Meta: []byte{posting.BitCompletePosting}, + Version: c.state.writeTs, + StreamId: streamId, + }) + } + fmt.Printf("Outputing counts for pred: %s. Rev: %v. StreamId: %d\n", pred, rev, streamId) + sort.Slice(list.Kv, func(i, j int) bool { + return bytes.Compare(list.Kv[i].Key, list.Kv[j].Key) < 0 + }) + if err := c.writer.Write(list); err != nil { + panic(err) } - x.Check(writer.Flush()) - c.wg.Done() } func (c *countIndexer) wait() { diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index f445f465199..3ffc2c86c9d 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -52,7 +52,7 @@ type options struct { MapBufSize int64 SkipMapPhase bool CleanupTmp bool - NumShufflers int + NumReducers int Version bool StoreXids bool ZeroAddr string @@ -227,8 +227,8 @@ type shuffleOutput struct { func (ld *loader) reduceStage() { ld.prog.setPhase(reducePhase) - shuf := shuffler{state: ld.state} - shuf.run() + r := reducer{state: ld.state} + r.run() } func (ld *loader) writeSchema() { diff --git a/dgraph/cmd/bulk/metrics.go b/dgraph/cmd/bulk/metrics.go deleted file mode 100644 index a34f859a2e7..00000000000 --- a/dgraph/cmd/bulk/metrics.go +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package bulk - -import "expvar" - -var ( - NumBadgerWrites = expvar.NewInt("dgraph-bulk-loader_badger_writes_pending") - NumReducers = expvar.NewInt("dgraph-bulk-loader_num_reducers_total") - NumQueuedReduceJobs = expvar.NewInt("dgraph-bulk-loader_reduce_queue_size") -) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 53752fc7729..19cc22ef6c8 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -17,27 +17,225 @@ package bulk import ( + "bufio" "bytes" + "container/heap" + "encoding/binary" "fmt" + "io" + "log" + "os" + "sync/atomic" + "github.com/dgraph-io/badger" + bo "github.com/dgraph-io/badger/options" bpb "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" - "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/gogo/protobuf/proto" ) -func reduce(mapEntries []*pb.MapEntry) *bpb.KVList { +type reducer struct { + *state +} + +func (r *reducer) run() error { + shardDirs := shardDirs(r.opt.TmpDir) + x.AssertTrue(len(shardDirs) == r.opt.ReduceShards) + x.AssertTrue(len(r.opt.shardOutputDirs) == r.opt.ReduceShards) + + thr := y.NewThrottle(r.opt.NumReducers) + for i := 0; i < r.opt.ReduceShards; i++ { + if err := thr.Do(); err != nil { + return err + } + go func(shardId int, db *badger.DB) { + defer thr.Done(nil) + + mapFiles := filenamesInTree(shardDirs[shardId]) + mapEntryChs := make([]chan *pb.MapEntry, len(mapFiles)) + for i, mapFile := range mapFiles { + mapEntryChs[i] = make(chan *pb.MapEntry, 1000) + go readMapOutput(mapFile, mapEntryChs[i]) + } + + writer := db.NewStreamWriter() + if err := writer.Prepare(); err != nil { + panic(err) + } + defer func() { + fmt.Println("Calling writer.Flush") + if err := writer.Flush(); err != nil { + panic(err) + } + }() + + ci := &countIndexer{state: r.state, writer: writer} + r.reduce(mapEntryChs, ci) + ci.wait() + }(i, r.createBadger(i)) + } + return thr.Finish() +} + +func (s *reducer) createBadger(i int) *badger.DB { + opt := badger.DefaultOptions + opt.SyncWrites = false + opt.TableLoadingMode = bo.MemoryMap + opt.ValueThreshold = 1 << 10 // 1 KB. + opt.Dir = s.opt.shardOutputDirs[i] + opt.ValueDir = opt.Dir + db, err := badger.OpenManaged(opt) + x.Check(err) + s.dbs = append(s.dbs, db) + return db +} + +func readMapOutput(filename string, mapEntryCh chan<- *pb.MapEntry) { + fd, err := os.Open(filename) + x.Check(err) + defer fd.Close() + r := bufio.NewReaderSize(fd, 16<<10) + + unmarshalBuf := make([]byte, 1<<10) + for { + buf, err := r.Peek(binary.MaxVarintLen64) + if err == io.EOF { + break + } + x.Check(err) + sz, n := binary.Uvarint(buf) + if n <= 0 { + log.Fatalf("Could not read uvarint: %d", n) + } + x.Check2(r.Discard(n)) + + for cap(unmarshalBuf) < int(sz) { + unmarshalBuf = make([]byte, sz) + } + x.Check2(io.ReadFull(r, unmarshalBuf[:sz])) + + me := new(pb.MapEntry) + x.Check(proto.Unmarshal(unmarshalBuf[:sz], me)) + mapEntryCh <- me + } + + close(mapEntryCh) +} + +type entryBatch struct { + entries []*pb.MapEntry +} + +func (r *reducer) encodeAndWrite( + writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) { + defer closer.Done() + + var listSize int + list := &bpb.KVList{} + for batch := range entryCh { + listSize += r.toList(batch, list) + if listSize > 4<<20 { + x.Check(writer.Write(list)) + list = &bpb.KVList{} + listSize = 0 + } + } + if len(list.Kv) > 0 { + x.Check(writer.Write(list)) + } +} + +func (r *reducer) reduce(mapEntryChs []chan *pb.MapEntry, ci *countIndexer) { + entryCh := make(chan []*pb.MapEntry, 100) + closer := y.NewCloser(1) + defer closer.SignalAndWait() + defer close(entryCh) + + var ph postingHeap + for _, ch := range mapEntryChs { + heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) + } + + writer := ci.writer + go r.encodeAndWrite(writer, entryCh, closer) + + const batchSize = 100000 + const batchAlloc = batchSize * 11 / 10 + batch := make([]*pb.MapEntry, 0, batchAlloc) + var prevKey []byte + var plistLen int + + for len(ph.nodes) > 0 { + me := ph.nodes[0].mapEntry + var ok bool + ph.nodes[0].mapEntry, ok = <-ph.nodes[0].ch + if ok { + heap.Fix(&ph, 0) + } else { + heap.Pop(&ph) + } + + keyChanged := !bytes.Equal(prevKey, me.Key) + if keyChanged && plistLen > 0 { + ci.addUid(prevKey, plistLen) + plistLen = 0 + } + + if len(batch) >= batchSize && keyChanged { + entryCh <- batch + batch = make([]*pb.MapEntry, 0, batchAlloc) + } + prevKey = me.Key + batch = append(batch, me) + plistLen++ + } + if len(batch) > 0 { + entryCh <- batch + } + if plistLen > 0 { + ci.addUid(prevKey, plistLen) + } +} + +type heapNode struct { + mapEntry *pb.MapEntry + ch <-chan *pb.MapEntry +} + +type postingHeap struct { + nodes []heapNode +} + +func (h *postingHeap) Len() int { + return len(h.nodes) +} +func (h *postingHeap) Less(i, j int) bool { + return less(h.nodes[i].mapEntry, h.nodes[j].mapEntry) +} +func (h *postingHeap) Swap(i, j int) { + h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i] +} +func (h *postingHeap) Push(x interface{}) { + h.nodes = append(h.nodes, x.(heapNode)) +} +func (h *postingHeap) Pop() interface{} { + elem := h.nodes[len(h.nodes)-1] + h.nodes = h.nodes[:len(h.nodes)-1] + return elem +} + +func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int { var currentKey []byte var uids []uint64 pl := new(pb.PostingList) + var size int - list := &bpb.KVList{} appendToList := func() { - // TODO: Bring this back. - // atomic.AddInt64(&r.prog.reduceKeyCount, 1) + atomic.AddInt64(&r.prog.reduceKeyCount, 1) // For a UID-only posting list, the badger value is a delta packed UID // list. The UserMeta indicates to treat the value as a delta packed @@ -50,22 +248,20 @@ func reduce(mapEntries []*pb.MapEntry) *bpb.KVList { pl.Pack = codec.Encode(uids, 256) val, err := pl.Marshal() x.Check(err) - list.Kv = append(list.Kv, &bpb.KV{ + kv := &bpb.KV{ Key: y.Copy(currentKey), Value: val, UserMeta: []byte{posting.BitCompletePosting}, - Version: 1, - }) - pk := x.Parse(currentKey) - fmt.Printf("append pk: %+v\n", pk) - + Version: 1, // Should probably be writeTs TODO + } + size += kv.Size() + list.Kv = append(list.Kv, kv) uids = uids[:0] pl.Reset() } for _, mapEntry := range mapEntries { - // TODO: Bring this back. - // atomic.AddInt64(&r.prog.reduceEdgeCount, 1) + atomic.AddInt64(&r.prog.reduceEdgeCount, 1) if !bytes.Equal(mapEntry.Key, currentKey) && currentKey != nil { appendToList() @@ -85,7 +281,7 @@ func reduce(mapEntries []*pb.MapEntry) *bpb.KVList { } } appendToList() - return list + return size // NumBadgerWrites.Add(1) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 929feb63071..9e21a212c03 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -71,8 +71,8 @@ func init() { flag.Bool("cleanup_tmp", true, "Clean up the tmp directory after the loader finishes. Setting this to false allows the"+ " bulk loader can be re-run while skipping the map phase.") - flag.Int("shufflers", 1, - "Number of shufflers to run concurrently. Increasing this can improve performance, and "+ + flag.Int("reducers", 1, + "Number of reducers to run concurrently. Increasing this can improve performance, and "+ "must be less than or equal to the number of reduce shards.") flag.Bool("version", false, "Prints the version of Dgraph Bulk Loader.") flag.BoolP("store_xids", "x", false, "Generate an xid edge for each node.") @@ -107,7 +107,7 @@ func run() { MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")), SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"), CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"), - NumShufflers: Bulk.Conf.GetInt("shufflers"), + NumReducers: Bulk.Conf.GetInt("reducers"), Version: Bulk.Conf.GetBool("version"), StoreXids: Bulk.Conf.GetBool("store_xids"), ZeroAddr: Bulk.Conf.GetString("zero"), @@ -142,9 +142,9 @@ func run() { opt.ReduceShards, opt.MapShards) os.Exit(1) } - if opt.NumShufflers > opt.ReduceShards { + if opt.NumReducers > opt.ReduceShards { fmt.Fprintf(os.Stderr, "Invalid flags: shufflers(%d) should be <= reduce_shards(%d)\n", - opt.NumShufflers, opt.ReduceShards) + opt.NumReducers, opt.ReduceShards) os.Exit(1) } if opt.CustomTokenizers != "" { diff --git a/dgraph/cmd/bulk/shuffle.go b/dgraph/cmd/bulk/shuffle.go deleted file mode 100644 index 8d1db70f89f..00000000000 --- a/dgraph/cmd/bulk/shuffle.go +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package bulk - -import ( - "bufio" - "bytes" - "container/heap" - "encoding/binary" - "fmt" - "io" - "log" - "os" - - "github.com/dgraph-io/badger" - bo "github.com/dgraph-io/badger/options" - "github.com/dgraph-io/badger/y" - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" - "github.com/gogo/protobuf/proto" -) - -type shuffler struct { - *state -} - -func (s *shuffler) run() { - shardDirs := shardDirs(s.opt.TmpDir) - x.AssertTrue(len(shardDirs) == s.opt.ReduceShards) - x.AssertTrue(len(s.opt.shardOutputDirs) == s.opt.ReduceShards) - - thr := x.NewThrottle(s.opt.NumShufflers) - for i := 0; i < s.opt.ReduceShards; i++ { - thr.Start() - go func(shardId int, db *badger.DB) { - mapFiles := filenamesInTree(shardDirs[shardId]) - shuffleInputChs := make([]chan *pb.MapEntry, len(mapFiles)) - for i, mapFile := range mapFiles { - shuffleInputChs[i] = make(chan *pb.MapEntry, 1000) - go readMapOutput(mapFile, shuffleInputChs[i]) - } - - ci := &countIndexer{state: s.state, db: db} - s.shufflePostings(shuffleInputChs, ci) - ci.wait() - thr.Done() - }(i, s.createBadger(i)) - } - thr.Wait() -} - -func (s *shuffler) createBadger(i int) *badger.DB { - opt := badger.DefaultOptions - opt.SyncWrites = false - opt.TableLoadingMode = bo.MemoryMap - opt.Dir = s.opt.shardOutputDirs[i] - opt.ValueDir = opt.Dir - db, err := badger.OpenManaged(opt) - x.Check(err) - s.dbs = append(s.dbs, db) - return db -} - -func readMapOutput(filename string, mapEntryCh chan<- *pb.MapEntry) { - fd, err := os.Open(filename) - x.Check(err) - defer fd.Close() - r := bufio.NewReaderSize(fd, 16<<10) - - unmarshalBuf := make([]byte, 1<<10) - for { - buf, err := r.Peek(binary.MaxVarintLen64) - if err == io.EOF { - break - } - x.Check(err) - sz, n := binary.Uvarint(buf) - if n <= 0 { - log.Fatalf("Could not read uvarint: %d", n) - } - x.Check2(r.Discard(n)) - - for cap(unmarshalBuf) < int(sz) { - unmarshalBuf = make([]byte, sz) - } - x.Check2(io.ReadFull(r, unmarshalBuf[:sz])) - - me := new(pb.MapEntry) - x.Check(proto.Unmarshal(unmarshalBuf[:sz], me)) - mapEntryCh <- me - } - - close(mapEntryCh) -} - -func (s *shuffler) shufflePostings(mapEntryChs []chan *pb.MapEntry, ci *countIndexer) { - var ph postingHeap - for _, ch := range mapEntryChs { - heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) - } - - db := ci.db - writer := db.NewStreamWriter() - if err := writer.Prepare(); err != nil { - panic(err) - } - defer func() { - if err := writer.Flush(); err != nil { - panic(err) - } - }() - - const batchSize = 1000 - const batchAlloc = batchSize * 11 / 10 - batch := make([]*pb.MapEntry, 0, batchAlloc) - var prevKey []byte - var slast []byte - var plistLen int - for len(ph.nodes) > 0 { - me := ph.nodes[0].mapEntry - var ok bool - ph.nodes[0].mapEntry, ok = <-ph.nodes[0].ch - if ok { - heap.Fix(&ph, 0) - } else { - heap.Pop(&ph) - } - if bytes.Compare(me.Key, prevKey) < 0 { - panic("what") - } - - keyChanged := !bytes.Equal(prevKey, me.Key) - // TODO: Bring this back. - // if keyChanged && plistLen > 0 { - // ci.addUid(prevKey, plistLen) - // plistLen = 0 - // } - - if len(batch) >= batchSize && keyChanged { - list := reduce(batch) - for _, kv := range list.Kv { - if bytes.Compare(kv.Key, slast) <= 0 { - panic("wrong") - } - slast = y.Copy(kv.Key) - } - x.Check(writer.Write(list)) - NumQueuedReduceJobs.Add(1) - batch = make([]*pb.MapEntry, 0, batchAlloc) - } - prevKey = me.Key - batch = append(batch, me) - plistLen++ - } - fmt.Println("Done with ph.nodes") - if len(batch) > 0 { - // reduce(shuffleOutput{mapEntries: batch, writer: writer}) - // x.Check(job.writer.Write(list)) - NumQueuedReduceJobs.Add(1) - } - if plistLen > 0 { - ci.addUid(prevKey, plistLen) - } - fmt.Println("Done shuffling.") -} - -type heapNode struct { - mapEntry *pb.MapEntry - ch <-chan *pb.MapEntry -} - -type postingHeap struct { - nodes []heapNode -} - -func (h *postingHeap) Len() int { - return len(h.nodes) -} -func (h *postingHeap) Less(i, j int) bool { - return less(h.nodes[i].mapEntry, h.nodes[j].mapEntry) -} -func (h *postingHeap) Swap(i, j int) { - h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i] -} -func (h *postingHeap) Push(x interface{}) { - h.nodes = append(h.nodes, x.(heapNode)) -} -func (h *postingHeap) Pop() interface{} { - elem := h.nodes[len(h.nodes)-1] - h.nodes = h.nodes[:len(h.nodes)-1] - return elem -} diff --git a/vendor/github.com/dgraph-io/badger/stream_writer.go b/vendor/github.com/dgraph-io/badger/stream_writer.go index fc0ed93df50..d046ee5e203 100644 --- a/vendor/github.com/dgraph-io/badger/stream_writer.go +++ b/vendor/github.com/dgraph-io/badger/stream_writer.go @@ -259,7 +259,7 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) { // Add adds key and vs to sortedWriter. func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { - if bytes.Compare(key, w.lastKey) <= 0 { + if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 { if bytes.Equal(key, w.lastKey) { fmt.Println("EQUAL") } diff --git a/vendor/github.com/dgraph-io/badger/y/y.go b/vendor/github.com/dgraph-io/badger/y/y.go index 4948315a9ae..161d3761b6d 100644 --- a/vendor/github.com/dgraph-io/badger/y/y.go +++ b/vendor/github.com/dgraph-io/badger/y/y.go @@ -19,6 +19,7 @@ package y import ( "bytes" "encoding/binary" + "encoding/hex" "fmt" "hash/crc32" "math" @@ -126,7 +127,7 @@ func ParseTs(key []byte) uint64 { // a would be sorted higher than aa if we use bytes.compare // All keys should have timestamp. func CompareKeys(key1, key2 []byte) int { - AssertTrue(len(key1) > 8 && len(key2) > 8) + AssertTruef(len(key1) > 8 && len(key2) > 8, "key1: %s . key2: %s", hex.Dump(key1), hex.Dump(key2)) if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 { return cmp } From 2704d21d8ed19af4fbd6875890fc44249f2142e0 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 15:47:04 -0700 Subject: [PATCH 03/10] Everything works with stream writer --- dgraph/cmd/bulk/count_index.go | 13 +-- dgraph/cmd/bulk/reduce.go | 119 ++++++++++++++-------- vendor/github.com/dgraph-io/badger/y/y.go | 3 +- 3 files changed, 82 insertions(+), 53 deletions(-) diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index ac9e799bd26..2fdeb7b78bc 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -18,7 +18,6 @@ package bulk import ( "bytes" - "fmt" "sort" "sync" "sync/atomic" @@ -38,12 +37,11 @@ type current struct { } type countIndexer struct { - *state - writer *badger.StreamWriter - cur current - counts map[int][]uint64 - streamId uint32 - wg sync.WaitGroup + *reducer + writer *badger.StreamWriter + cur current + counts map[int][]uint64 + wg sync.WaitGroup } // addUid adds the uid from rawKey to a count index if a count index is @@ -96,7 +94,6 @@ func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64 StreamId: streamId, }) } - fmt.Printf("Outputing counts for pred: %s. Rev: %v. StreamId: %d\n", pred, rev, streamId) sort.Slice(list.Kv, func(i, j int) bool { return bytes.Compare(list.Kv[i].Key, list.Kv[j].Key) < 0 }) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 19cc22ef6c8..bab3fb4783b 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -40,6 +40,7 @@ import ( type reducer struct { *state + streamId uint32 } func (r *reducer) run() error { @@ -56,10 +57,10 @@ func (r *reducer) run() error { defer thr.Done(nil) mapFiles := filenamesInTree(shardDirs[shardId]) - mapEntryChs := make([]chan *pb.MapEntry, len(mapFiles)) - for i, mapFile := range mapFiles { - mapEntryChs[i] = make(chan *pb.MapEntry, 1000) - go readMapOutput(mapFile, mapEntryChs[i]) + var mapItrs []*mapIterator + for _, mapFile := range mapFiles { + itr := newMapIterator(mapFile) + mapItrs = append(mapItrs, itr) } writer := db.NewStreamWriter() @@ -67,14 +68,18 @@ func (r *reducer) run() error { panic(err) } defer func() { - fmt.Println("Calling writer.Flush") if err := writer.Flush(); err != nil { panic(err) } + for _, itr := range mapItrs { + if err := itr.Close(); err != nil { + fmt.Printf("Error while closing iterator: %v", err) + } + } }() - ci := &countIndexer{state: r.state, writer: writer} - r.reduce(mapEntryChs, ci) + ci := &countIndexer{reducer: r, writer: writer} + r.reduce(mapItrs, ci) ci.wait() }(i, r.createBadger(i)) } @@ -88,42 +93,51 @@ func (s *reducer) createBadger(i int) *badger.DB { opt.ValueThreshold = 1 << 10 // 1 KB. opt.Dir = s.opt.shardOutputDirs[i] opt.ValueDir = opt.Dir + opt.Logger = nil db, err := badger.OpenManaged(opt) x.Check(err) s.dbs = append(s.dbs, db) return db } -func readMapOutput(filename string, mapEntryCh chan<- *pb.MapEntry) { - fd, err := os.Open(filename) - x.Check(err) - defer fd.Close() - r := bufio.NewReaderSize(fd, 16<<10) - - unmarshalBuf := make([]byte, 1<<10) - for { - buf, err := r.Peek(binary.MaxVarintLen64) - if err == io.EOF { - break - } - x.Check(err) - sz, n := binary.Uvarint(buf) - if n <= 0 { - log.Fatalf("Could not read uvarint: %d", n) - } - x.Check2(r.Discard(n)) +type mapIterator struct { + fd *os.File + reader *bufio.Reader + tmpBuf []byte +} - for cap(unmarshalBuf) < int(sz) { - unmarshalBuf = make([]byte, sz) - } - x.Check2(io.ReadFull(r, unmarshalBuf[:sz])) +func (mi *mapIterator) Close() error { + return mi.fd.Close() +} + +func (mi *mapIterator) Next() *pb.MapEntry { + r := mi.reader + buf, err := r.Peek(binary.MaxVarintLen64) + if err == io.EOF { + return nil + } + x.Check(err) + sz, n := binary.Uvarint(buf) + if n <= 0 { + log.Fatalf("Could not read uvarint: %d", n) + } + x.Check2(r.Discard(n)) - me := new(pb.MapEntry) - x.Check(proto.Unmarshal(unmarshalBuf[:sz], me)) - mapEntryCh <- me + for cap(mi.tmpBuf) < int(sz) { + mi.tmpBuf = make([]byte, sz) } + x.Check2(io.ReadFull(r, mi.tmpBuf[:sz])) + + me := new(pb.MapEntry) + x.Check(proto.Unmarshal(mi.tmpBuf[:sz], me)) + return me +} + +func newMapIterator(filename string) *mapIterator { + fd, err := os.Open(filename) + x.Check(err) - close(mapEntryCh) + return &mapIterator{fd: fd, reader: bufio.NewReaderSize(fd, 16<<10)} } type entryBatch struct { @@ -134,11 +148,25 @@ func (r *reducer) encodeAndWrite( writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) { defer closer.Done() + preds := make(map[string]uint32) + var listSize int list := &bpb.KVList{} for batch := range entryCh { listSize += r.toList(batch, list) if listSize > 4<<20 { + for _, kv := range list.Kv { + pk := x.Parse(kv.Key) + if len(pk.Attr) == 0 { + continue + } + streamId := preds[pk.Attr] + if streamId == 0 { + streamId = atomic.AddUint32(&r.streamId, 1) + preds[pk.Attr] = streamId + } + kv.StreamId = streamId + } x.Check(writer.Write(list)) list = &bpb.KVList{} listSize = 0 @@ -149,31 +177,35 @@ func (r *reducer) encodeAndWrite( } } -func (r *reducer) reduce(mapEntryChs []chan *pb.MapEntry, ci *countIndexer) { +func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { entryCh := make(chan []*pb.MapEntry, 100) closer := y.NewCloser(1) defer closer.SignalAndWait() - defer close(entryCh) var ph postingHeap - for _, ch := range mapEntryChs { - heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) + for _, itr := range mapItrs { + me := itr.Next() + if me != nil { + heap.Push(&ph, heapNode{mapEntry: me, itr: itr}) + } else { + fmt.Printf("INVALID first map entry for %s", itr.fd.Name()) + } } writer := ci.writer go r.encodeAndWrite(writer, entryCh, closer) - const batchSize = 100000 + const batchSize = 10000 const batchAlloc = batchSize * 11 / 10 batch := make([]*pb.MapEntry, 0, batchAlloc) var prevKey []byte var plistLen int for len(ph.nodes) > 0 { - me := ph.nodes[0].mapEntry - var ok bool - ph.nodes[0].mapEntry, ok = <-ph.nodes[0].ch - if ok { + node0 := &ph.nodes[0] + me := node0.mapEntry + node0.mapEntry = node0.itr.Next() + if node0.mapEntry != nil { heap.Fix(&ph, 0) } else { heap.Pop(&ph) @@ -199,11 +231,12 @@ func (r *reducer) reduce(mapEntryChs []chan *pb.MapEntry, ci *countIndexer) { if plistLen > 0 { ci.addUid(prevKey, plistLen) } + close(entryCh) } type heapNode struct { mapEntry *pb.MapEntry - ch <-chan *pb.MapEntry + itr *mapIterator } type postingHeap struct { diff --git a/vendor/github.com/dgraph-io/badger/y/y.go b/vendor/github.com/dgraph-io/badger/y/y.go index 161d3761b6d..4948315a9ae 100644 --- a/vendor/github.com/dgraph-io/badger/y/y.go +++ b/vendor/github.com/dgraph-io/badger/y/y.go @@ -19,7 +19,6 @@ package y import ( "bytes" "encoding/binary" - "encoding/hex" "fmt" "hash/crc32" "math" @@ -127,7 +126,7 @@ func ParseTs(key []byte) uint64 { // a would be sorted higher than aa if we use bytes.compare // All keys should have timestamp. func CompareKeys(key1, key2 []byte) int { - AssertTruef(len(key1) > 8 && len(key2) > 8, "key1: %s . key2: %s", hex.Dump(key1), hex.Dump(key2)) + AssertTrue(len(key1) > 8 && len(key2) > 8) if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 { return cmp } From ca81a07e0c397d5f2f0f6ca3d82119bb56bb2362 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 15:59:25 -0700 Subject: [PATCH 04/10] Self review --- dgraph/cmd/bulk/loader.go | 2 +- dgraph/cmd/bulk/reduce.go | 26 ++++---------------------- protos/pb.proto | 2 -- 3 files changed, 5 insertions(+), 25 deletions(-) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 3ffc2c86c9d..b90b9204b2a 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -228,7 +228,7 @@ func (ld *loader) reduceStage() { ld.prog.setPhase(reducePhase) r := reducer{state: ld.state} - r.run() + x.Check(r.run()) } func (ld *loader) writeSchema() { diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index bab3fb4783b..389bf50d518 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -86,17 +86,17 @@ func (r *reducer) run() error { return thr.Finish() } -func (s *reducer) createBadger(i int) *badger.DB { +func (r *reducer) createBadger(i int) *badger.DB { opt := badger.DefaultOptions opt.SyncWrites = false opt.TableLoadingMode = bo.MemoryMap opt.ValueThreshold = 1 << 10 // 1 KB. - opt.Dir = s.opt.shardOutputDirs[i] + opt.Dir = r.opt.shardOutputDirs[i] opt.ValueDir = opt.Dir opt.Logger = nil db, err := badger.OpenManaged(opt) x.Check(err) - s.dbs = append(s.dbs, db) + r.dbs = append(r.dbs, db) return db } @@ -140,10 +140,6 @@ func newMapIterator(filename string) *mapIterator { return &mapIterator{fd: fd, reader: bufio.NewReaderSize(fd, 16<<10)} } -type entryBatch struct { - entries []*pb.MapEntry -} - func (r *reducer) encodeAndWrite( writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) { defer closer.Done() @@ -285,7 +281,7 @@ func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int { Key: y.Copy(currentKey), Value: val, UserMeta: []byte{posting.BitCompletePosting}, - Version: 1, // Should probably be writeTs TODO + Version: r.state.writeTs, } size += kv.Size() list.Kv = append(list.Kv, kv) @@ -315,18 +311,4 @@ func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int { } appendToList() return size - - // NumBadgerWrites.Add(1) - - // TODO: Bring this back. - // for _, kv := range list.Kv { - // pk := x.Parse(kv.Key) - // fmt.Printf("pk: %+v\n", pk) - // } - // x.Check(job.writer.Write(list)) - - // x.Check(txn.CommitAt(r.state.writeTs, func(err error) { - // x.Check(err) - // NumBadgerWrites.Add(-1) - // })) } diff --git a/protos/pb.proto b/protos/pb.proto index 104d3d042aa..90193cb319a 100644 --- a/protos/pb.proto +++ b/protos/pb.proto @@ -291,8 +291,6 @@ message PostingList { uint64 commit_ts = 3; // More inclination towards smaller values. repeated uint64 splits = 4; - - repeated uint64 longterm_uids = 21; } message FacetParam { From 6402ee1237b21a30f25ce975d8eb35fee74c7086 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 16:15:24 -0700 Subject: [PATCH 05/10] Vendor Badger in, because it contains fixes to StreamWriter. --- vendor/github.com/dgraph-io/badger/README.md | 95 +++++++++++++++---- vendor/github.com/dgraph-io/badger/batch.go | 14 +-- vendor/github.com/dgraph-io/badger/db.go | 2 +- .../github.com/dgraph-io/badger/dir_unix.go | 3 +- vendor/github.com/dgraph-io/badger/errors.go | 7 +- .../github.com/dgraph-io/badger/iterator.go | 6 +- vendor/github.com/dgraph-io/badger/levels.go | 3 +- .../github.com/dgraph-io/badger/manifest.go | 7 +- vendor/github.com/dgraph-io/badger/merge.go | 21 ++-- .../dgraph-io/badger/stream_writer.go | 29 ++---- vendor/github.com/dgraph-io/badger/structs.go | 3 +- .../dgraph-io/badger/table/builder.go | 6 +- .../dgraph-io/badger/table/table.go | 2 +- .../dgraph-io/badger/y/watermark.go | 4 +- vendor/vendor.json | 30 +++--- 15 files changed, 142 insertions(+), 90 deletions(-) diff --git a/vendor/github.com/dgraph-io/badger/README.md b/vendor/github.com/dgraph-io/badger/README.md index a25d8e1ff57..873316025c7 100644 --- a/vendor/github.com/dgraph-io/badger/README.md +++ b/vendor/github.com/dgraph-io/badger/README.md @@ -209,6 +209,18 @@ err := db.Update(func(txn *badger.Txn) error { }) ``` +Key/Value pair can also be saved by first creating `Entry`, then setting this +`Entry` using `Txn.SetEntry()`. `Entry` also exposes methods to set properties +on it. + +```go +err := db.Update(func(txn *badger.Txn) error { + e := NewEntry([]byte("answer"), []byte("42")) + err := txn.SetEntry(e) + return err +}) +``` + This will set the value of the `"answer"` key to `"42"`. To retrieve this value, we can use the `Txn.Get()` method: @@ -278,11 +290,41 @@ for { ``` ### Merge Operations -Badger provides support for unordered merge operations. You can define a func +Badger provides support for ordered merge operations. You can define a func of type `MergeFunc` which takes in an existing value, and a value to be _merged_ with it. It returns a new value which is the result of the _merge_ operation. All values are specified in byte arrays. For e.g., here is a merge -function (`add`) which adds a `uint64` value to an existing `uint64` value. +function (`add`) which appends a `[]byte` value to an existing `[]byte` value. + +```Go +// Merge function to append one byte slice to another +func add(originalValue, newValue []byte) []byte { + return append(originalValue, newValue...) +} +``` + +This function can then be passed to the `DB.GetMergeOperator()` method, along +with a key, and a duration value. The duration specifies how often the merge +function is run on values that have been added using the `MergeOperator.Add()` +method. + +`MergeOperator.Get()` method can be used to retrieve the cumulative value of the key +associated with the merge operation. + +```Go +key := []byte("merge") + +m := db.GetMergeOperator(key, add, 200*time.Millisecond) +defer m.Stop() + +m.Add([]byte("A")) +m.Add([]byte("B")) +m.Add([]byte("C")) + +res, _ := m.Get() // res should have value ABC encoded +``` + +Example: Merge operator which increments a counter ```Go func uint64ToBytes(i uint64) []byte { @@ -300,17 +342,10 @@ func add(existing, new []byte) []byte { return uint64ToBytes(bytesToUint64(existing) + bytesToUint64(new)) } ``` - -This function can then be passed to the `DB.GetMergeOperator()` method, along -with a key, and a duration value. The duration specifies how often the merge -function is run on values that have been added using the `MergeOperator.Add()` -method. - -`MergeOperator.Get()` method can be used to retrieve the cumulative value of the key -associated with the merge operation. - +It can be used as ```Go key := []byte("merge") + m := db.GetMergeOperator(key, add, 200*time.Millisecond) defer m.Stop() @@ -318,23 +353,46 @@ m.Add(uint64ToBytes(1)) m.Add(uint64ToBytes(2)) m.Add(uint64ToBytes(3)) -res, err := m.Get() // res should have value 6 encoded -fmt.Println(bytesToUint64(res)) +res, _ := m.Get() // res should have value 6 encoded ``` ### Setting Time To Live(TTL) and User Metadata on Keys Badger allows setting an optional Time to Live (TTL) value on keys. Once the TTL has elapsed, the key will no longer be retrievable and will be eligible for garbage -collection. A TTL can be set as a `time.Duration` value using the `Txn.SetWithTTL()` -API method. +collection. A TTL can be set as a `time.Duration` value using the `Entry.WithTTL()` +and `Txn.SetEntry()` API methods. + +```go +err := db.Update(func(txn *badger.Txn) error { + e := NewEntry([]byte("answer"), []byte("42")).WithTTL(time.Hour) + err := txn.SetEntry(e) + return err +}) +``` An optional user metadata value can be set on each key. A user metadata value is represented by a single byte. It can be used to set certain bits along with the key to aid in interpreting or decoding the key-value pair. User -metadata can be set using the `Txn.SetWithMeta()` API method. +metadata can be set using `Entry.WithMeta()` and `Txn.SetEntry()` API methods. + +```go +err := db.Update(func(txn *badger.Txn) error { + e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)) + err := txn.SetEntry(e) + return err +}) +``` + +`Entry` APIs can be used to add the user metadata and TTL for same key. This `Entry` +then can be set using `Txn.SetEntry()`. -`Txn.SetEntry()` can be used to set the key, value, user metatadata and TTL, -all at once. +```go +err := db.Update(func(txn *badger.Txn) error { + e := NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour) + err := txn.SetEntry(e) + return err +}) +``` ### Iterating over keys To iterate over keys, we can use an `Iterator`, which can be obtained using the @@ -676,6 +734,7 @@ Below is a list of known projects that use Badger: * [Goblero](https://github.com/didil/goblero) - Pure Go embedded persistent job queue backed by BadgerDB * [Surfline](https://www.surfline.com) - Serving global wave and weather forecast data with Badger. * [Cete](https://github.com/mosuka/cete) - Simple and highly available distributed key-value store built on Badger. Makes it easy bringing up a cluster of Badger with Raft consensus algorithm by hashicorp/raft. +* [Volument](https://volument.com/) - A new take on website analytics backed by Badger. If you are using Badger in a project please send a pull request to add it to the list. diff --git a/vendor/github.com/dgraph-io/badger/batch.go b/vendor/github.com/dgraph-io/badger/batch.go index bfbc239b0b0..c94e0fed472 100644 --- a/vendor/github.com/dgraph-io/badger/batch.go +++ b/vendor/github.com/dgraph-io/badger/batch.go @@ -18,7 +18,6 @@ package badger import ( "sync" - "time" "github.com/dgraph-io/badger/y" ) @@ -102,16 +101,9 @@ func (wb *WriteBatch) SetEntry(e *Entry) error { return nil } -// Set is equivalent of Txn.SetWithMeta. -func (wb *WriteBatch) Set(k, v []byte, meta byte) error { - e := &Entry{Key: k, Value: v, UserMeta: meta} - return wb.SetEntry(e) -} - -// SetWithTTL is equivalent of Txn.SetWithTTL. -func (wb *WriteBatch) SetWithTTL(key, val []byte, dur time.Duration) error { - expire := time.Now().Add(dur).Unix() - e := &Entry{Key: key, Value: val, ExpiresAt: uint64(expire)} +// Set is equivalent of Txn.Set(). +func (wb *WriteBatch) Set(k, v []byte) error { + e := &Entry{Key: k, Value: v} return wb.SetEntry(e) } diff --git a/vendor/github.com/dgraph-io/badger/db.go b/vendor/github.com/dgraph-io/badger/db.go index 012ef22e061..c0fce4e5b18 100644 --- a/vendor/github.com/dgraph-io/badger/db.go +++ b/vendor/github.com/dgraph-io/badger/db.go @@ -210,7 +210,7 @@ func Open(opt Options) (db *DB, err error) { } if !dirExists { if opt.ReadOnly { - return nil, y.Wrapf(err, "Cannot find Dir for read-only open: %q", path) + return nil, errors.Errorf("Cannot find directory %q for read-only open", path) } // Try to create the directory err = os.Mkdir(path, 0700) diff --git a/vendor/github.com/dgraph-io/badger/dir_unix.go b/vendor/github.com/dgraph-io/badger/dir_unix.go index a5e0fa33c55..146713a3601 100644 --- a/vendor/github.com/dgraph-io/badger/dir_unix.go +++ b/vendor/github.com/dgraph-io/badger/dir_unix.go @@ -42,7 +42,8 @@ type directoryLockGuard struct { // acquireDirectoryLock gets a lock on the directory (using flock). If // this is not read-only, it will also write our pid to // dirPath/pidFileName for convenience. -func acquireDirectoryLock(dirPath string, pidFileName string, readOnly bool) (*directoryLockGuard, error) { +func acquireDirectoryLock(dirPath string, pidFileName string, readOnly bool) ( + *directoryLockGuard, error) { // Convert to absolute path so that Release still works even if we do an unbalanced // chdir in the meantime. absPidFilePath, err := filepath.Abs(filepath.Join(dirPath, pidFileName)) diff --git a/vendor/github.com/dgraph-io/badger/errors.go b/vendor/github.com/dgraph-io/badger/errors.go index cad66cb16ef..b8c0d9c2942 100644 --- a/vendor/github.com/dgraph-io/badger/errors.go +++ b/vendor/github.com/dgraph-io/badger/errors.go @@ -35,8 +35,8 @@ var ( // ErrTxnTooBig is returned if too many writes are fit into a single transaction. ErrTxnTooBig = errors.New("Txn is too big to fit into one request") - // ErrConflict is returned when a transaction conflicts with another transaction. This can happen if - // the read rows had been updated concurrently by another transaction. + // ErrConflict is returned when a transaction conflicts with another transaction. This can + // happen if the read rows had been updated concurrently by another transaction. ErrConflict = errors.New("Transaction Conflict. Please retry") // ErrReadOnlyTxn is returned if an update function is called on a read-only transaction. @@ -97,7 +97,8 @@ var ( // ErrTruncateNeeded is returned when the value log gets corrupt, and requires truncation of // corrupt data to allow Badger to run properly. - ErrTruncateNeeded = errors.New("Value log truncate required to run DB. This might result in data loss") + ErrTruncateNeeded = errors.New( + "Value log truncate required to run DB. This might result in data loss") // ErrBlockedWrites is returned if the user called DropAll. During the process of dropping all // data from Badger, we stop accepting new writes, by returning this error. diff --git a/vendor/github.com/dgraph-io/badger/iterator.go b/vendor/github.com/dgraph-io/badger/iterator.go index c071324d8b3..d1fa05794b5 100644 --- a/vendor/github.com/dgraph-io/badger/iterator.go +++ b/vendor/github.com/dgraph-io/badger/iterator.go @@ -648,9 +648,9 @@ func (it *Iterator) prefetch() { } } -// Seek would seek to the provided key if present. If absent, it would seek to the next smallest key -// greater than the provided key if iterating in the forward direction. Behavior would be reversed if -// iterating backwards. +// Seek would seek to the provided key if present. If absent, it would seek to the next +// smallest key greater than the provided key if iterating in the forward direction. +// Behavior would be reversed if iterating backwards. func (it *Iterator) Seek(key []byte) { for i := it.data.pop(); i != nil; i = it.data.pop() { i.wg.Wait() diff --git a/vendor/github.com/dgraph-io/badger/levels.go b/vendor/github.com/dgraph-io/badger/levels.go index df90164c4d3..bfd7e62feba 100644 --- a/vendor/github.com/dgraph-io/badger/levels.go +++ b/vendor/github.com/dgraph-io/badger/levels.go @@ -526,7 +526,8 @@ func (s *levelsController) compactBuildTables( vs := it.Value() version := y.ParseTs(it.Key()) - // Do not discard entries inserted by merge operator. These entries will be discarded once they're merged + // Do not discard entries inserted by merge operator. These entries will be + // discarded once they're merged if version <= discardTs && vs.Meta&bitMergeEntry == 0 { // Keep track of the number of versions encountered for this key. Only consider the // versions which are below the minReadTs, otherwise, we might end up discarding the diff --git a/vendor/github.com/dgraph-io/badger/manifest.go b/vendor/github.com/dgraph-io/badger/manifest.go index 34ce1217243..4d2d83fa47e 100644 --- a/vendor/github.com/dgraph-io/badger/manifest.go +++ b/vendor/github.com/dgraph-io/badger/manifest.go @@ -113,11 +113,14 @@ func (m *Manifest) clone() Manifest { // openOrCreateManifestFile opens a Badger manifest file if it exists, or creates on if // one doesn’t. -func openOrCreateManifestFile(dir string, readOnly bool) (ret *manifestFile, result Manifest, err error) { +func openOrCreateManifestFile(dir string, readOnly bool) ( + ret *manifestFile, result Manifest, err error) { return helpOpenOrCreateManifestFile(dir, readOnly, manifestDeletionsRewriteThreshold) } -func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (ret *manifestFile, result Manifest, err error) { +func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) ( + ret *manifestFile, result Manifest, err error) { + path := filepath.Join(dir, ManifestFilename) var flags uint32 if readOnly { diff --git a/vendor/github.com/dgraph-io/badger/merge.go b/vendor/github.com/dgraph-io/badger/merge.go index db3d33c9a6b..02ad4bcde44 100644 --- a/vendor/github.com/dgraph-io/badger/merge.go +++ b/vendor/github.com/dgraph-io/badger/merge.go @@ -37,10 +37,8 @@ type MergeOperator struct { // another representing a new value that needs to be ‘merged’ into it. MergeFunc // contains the logic to perform the ‘merge’ and return an updated value. // MergeFunc could perform operations like integer addition, list appends etc. -// Note that the ordering of the operands is unspecified, so the merge func -// should either be agnostic to ordering or do additional handling if ordering -// is required. -type MergeFunc func(existing, val []byte) []byte +// Note that the ordering of the operands is maintained. +type MergeFunc func(existingVal, newVal []byte) []byte // GetMergeOperator creates a new MergeOperator for a given key and returns a // pointer to it. It also fires off a goroutine that performs a compaction using @@ -60,7 +58,7 @@ func (db *DB) GetMergeOperator(key []byte, var errNoMerge = errors.New("No need for merge") -func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error) { +func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) { txn := op.db.NewTransaction(false) defer txn.Discard() opt := DefaultIteratorOptions @@ -73,14 +71,17 @@ func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error item := it.Item() numVersions++ if numVersions == 1 { - val, err = item.ValueCopy(val) + // This should be the newVal, considering this is the latest version. + newVal, err = item.ValueCopy(newVal) if err != nil { return nil, 0, err } latest = item.Version() } else { - if err := item.Value(func(newVal []byte) error { - val = op.f(val, newVal) + if err := item.Value(func(oldVal []byte) error { + // The merge should always be on the newVal considering it has the merge result of + // the latest version. The value read should be the oldVal. + newVal = op.f(oldVal, newVal) return nil }); err != nil { return nil, 0, err @@ -93,9 +94,9 @@ func (op *MergeOperator) iterateAndMerge() (val []byte, latest uint64, err error if numVersions == 0 { return nil, latest, ErrKeyNotFound } else if numVersions == 1 { - return val, latest, errNoMerge + return newVal, latest, errNoMerge } - return val, latest, nil + return newVal, latest, nil } func (op *MergeOperator) compact() error { diff --git a/vendor/github.com/dgraph-io/badger/stream_writer.go b/vendor/github.com/dgraph-io/badger/stream_writer.go index d046ee5e203..b134f87cd2a 100644 --- a/vendor/github.com/dgraph-io/badger/stream_writer.go +++ b/vendor/github.com/dgraph-io/badger/stream_writer.go @@ -17,15 +17,11 @@ package badger import ( - "bytes" - "encoding/hex" - "fmt" "math" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/y" - "github.com/dgraph-io/dgraph/x" humanize "github.com/dustin/go-humanize" "github.com/pkg/errors" ) @@ -179,7 +175,10 @@ func (sw *StreamWriter) Flush() error { return err } } - return syncDir(sw.db.opt.Dir) + if err := syncDir(sw.db.opt.Dir); err != nil { + return err + } + return sw.db.lc.validate() } type sortedWriter struct { @@ -260,27 +259,19 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) { // Add adds key and vs to sortedWriter. func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 { - if bytes.Equal(key, w.lastKey) { - fmt.Println("EQUAL") - } - pk1 := x.Parse(key) - pk2 := x.Parse(w.lastKey) - fmt.Printf("Cur: %+v\n", pk1) - fmt.Printf("Lastkey: %+v\n", pk2) - - return errors.Wrapf(ErrUnsortedKey, "key: %s lastKey: %s", hex.Dump(key), hex.Dump(w.lastKey)) + return ErrUnsortedKey } + sameKey := y.SameKey(key, w.lastKey) // Same keys should go into the same SSTable. if !sameKey && w.builder.ReachedCapacity(w.db.opt.MaxTableSize) { - return w.send() + if err := w.send(); err != nil { + return err + } } w.lastKey = y.SafeCopy(w.lastKey, key) - if err := w.builder.Add(key, vs); err != nil { - return err - } - return nil + return w.builder.Add(key, vs) } func (w *sortedWriter) send() error { diff --git a/vendor/github.com/dgraph-io/badger/structs.go b/vendor/github.com/dgraph-io/badger/structs.go index b88c41a2043..ffab4ad82e4 100644 --- a/vendor/github.com/dgraph-io/badger/structs.go +++ b/vendor/github.com/dgraph-io/badger/structs.go @@ -77,7 +77,8 @@ func (h *header) Decode(buf []byte) { h.userMeta = buf[17] } -// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by the user to set data. +// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by +// the user to set data. type Entry struct { Key []byte Value []byte diff --git a/vendor/github.com/dgraph-io/badger/table/builder.go b/vendor/github.com/dgraph-io/badger/table/builder.go index 43e6562239c..0657cbca182 100644 --- a/vendor/github.com/dgraph-io/badger/table/builder.go +++ b/vendor/github.com/dgraph-io/badger/table/builder.go @@ -171,13 +171,15 @@ func (b *Builder) Add(key []byte, value y.ValueStruct) error { } // TODO: vvv this was the comment on ReachedCapacity. -// FinalSize returns the *rough* final size of the array, counting the header which is not yet written. +// FinalSize returns the *rough* final size of the array, counting the header which is +// not yet written. // TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty) // at the end. The diff can vary. // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - estimateSz := b.buf.Len() + 8 /* empty header */ + 4*len(b.restarts) + 8 // 8 = end of buf offset + len(restarts). + estimateSz := b.buf.Len() + 8 /* empty header */ + 4*len(b.restarts) + + 8 /* 8 = end of buf offset + len(restarts) */ return int64(estimateSz) > cap } diff --git a/vendor/github.com/dgraph-io/badger/table/table.go b/vendor/github.com/dgraph-io/badger/table/table.go index 9650c08ec67..147b167b7b6 100644 --- a/vendor/github.com/dgraph-io/badger/table/table.go +++ b/vendor/github.com/dgraph-io/badger/table/table.go @@ -178,7 +178,7 @@ func OpenTable(fd *os.File, mode options.FileLoadingMode, cksum []byte) (*Table, t.mmap, err = y.Mmap(fd, false, fileInfo.Size()) if err != nil { _ = fd.Close() - return nil, y.Wrapf(err, "Unable to map file") + return nil, y.Wrapf(err, "Unable to map file: %q", fileInfo.Name()) } case options.FileIO: t.mmap = nil diff --git a/vendor/github.com/dgraph-io/badger/y/watermark.go b/vendor/github.com/dgraph-io/badger/y/watermark.go index c0bbb1940db..10ca00e7e38 100644 --- a/vendor/github.com/dgraph-io/badger/y/watermark.go +++ b/vendor/github.com/dgraph-io/badger/y/watermark.go @@ -164,8 +164,8 @@ func (w *WaterMark) process(closer *Closer) { loop++ if len(indices) > 0 && loop%10000 == 0 { min := indices[0] - w.elog.Printf("WaterMark %s: Done entry %4d. Size: %4d Watermark: %-4d Looking for: %-4d. Value: %d\n", - w.Name, index, len(indices), w.DoneUntil(), min, pending[min]) + w.elog.Printf("WaterMark %s: Done entry %4d. Size: %4d Watermark: %-4d Looking for: "+ + "%-4d. Value: %d\n", w.Name, index, len(indices), w.DoneUntil(), min, pending[min]) } // Update mark by going through all indices in order; and checking if they have diff --git a/vendor/vendor.json b/vendor/vendor.json index 500e81460d9..b0e450feb49 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -459,22 +459,22 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "hbxI6Fi4cqMEDRqRiQg5A5DFYCY=", + "checksumSHA1": "LmxXlda5frwORaWcfAZW0OWyERI=", "path": "github.com/dgraph-io/badger", - "revision": "011593322a56ec3f5341efb3e03ec04f616b420a", - "revisionTime": "2019-05-31T16:18:31Z" + "revision": "3f5a4cb1b0b2f1c11d28943247216f8b87866a4c", + "revisionTime": "2019-06-07T23:13:53Z" }, { "checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=", "path": "github.com/dgraph-io/badger/options", - "revision": "7116e163e500f67a5a3c9fd80a170d365c8d8135", - "revisionTime": "2019-05-16T23:35:11Z" + "revision": "3f5a4cb1b0b2f1c11d28943247216f8b87866a4c", + "revisionTime": "2019-06-07T23:13:53Z" }, { "checksumSHA1": "SV7o4+eEK7/XNWC7H7Z5vWCoHP0=", "path": "github.com/dgraph-io/badger/pb", - "revision": "7116e163e500f67a5a3c9fd80a170d365c8d8135", - "revisionTime": "2019-05-16T23:35:11Z" + "revision": "3f5a4cb1b0b2f1c11d28943247216f8b87866a4c", + "revisionTime": "2019-06-07T23:13:53Z" }, { "checksumSHA1": "d8wE18ae6lOhmJqh0jwwhmQCkII=", @@ -485,20 +485,20 @@ { "checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=", "path": "github.com/dgraph-io/badger/skl", - "revision": "7116e163e500f67a5a3c9fd80a170d365c8d8135", - "revisionTime": "2019-05-16T23:35:11Z" + "revision": "3f5a4cb1b0b2f1c11d28943247216f8b87866a4c", + "revisionTime": "2019-06-07T23:13:53Z" }, { - "checksumSHA1": "ovAuDsGfn83KcTieKJjvx93TyUU=", + "checksumSHA1": "6b+M6HOosiA2HlPwO0DGN8jDn4w=", "path": "github.com/dgraph-io/badger/table", - "revision": "7116e163e500f67a5a3c9fd80a170d365c8d8135", - "revisionTime": "2019-05-16T23:35:11Z" + "revision": "3f5a4cb1b0b2f1c11d28943247216f8b87866a4c", + "revisionTime": "2019-06-07T23:13:53Z" }, { - "checksumSHA1": "KI48+d+XHzrlAenqQh/Re7swIWk=", + "checksumSHA1": "ozi+AzOCsxkBoTXDQiwS4a7M0mU=", "path": "github.com/dgraph-io/badger/y", - "revision": "7116e163e500f67a5a3c9fd80a170d365c8d8135", - "revisionTime": "2019-05-16T23:35:11Z" + "revision": "3f5a4cb1b0b2f1c11d28943247216f8b87866a4c", + "revisionTime": "2019-06-07T23:13:53Z" }, { "checksumSHA1": "0hrzVs4TUnFfydHG0QidJYURqfg=", From d57dedddcd2f71d9bcf87fe451386b3bbabe0ae6 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 16:19:33 -0700 Subject: [PATCH 06/10] Remove defer --- dgraph/cmd/bulk/reduce.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 389bf50d518..0a9ec410add 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -67,20 +67,19 @@ func (r *reducer) run() error { if err := writer.Prepare(); err != nil { panic(err) } - defer func() { - if err := writer.Flush(); err != nil { - panic(err) - } - for _, itr := range mapItrs { - if err := itr.Close(); err != nil { - fmt.Printf("Error while closing iterator: %v", err) - } - } - }() ci := &countIndexer{reducer: r, writer: writer} r.reduce(mapItrs, ci) ci.wait() + + if err := writer.Flush(); err != nil { + panic(err) + } + for _, itr := range mapItrs { + if err := itr.Close(); err != nil { + fmt.Printf("Error while closing iterator: %v", err) + } + } }(i, r.createBadger(i)) } return thr.Finish() From 069095c990531cac7fe214cccb4d3623515ba705 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 16:27:57 -0700 Subject: [PATCH 07/10] Error wording fix --- dgraph/cmd/bulk/reduce.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 0a9ec410add..11ae92896c2 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -183,7 +183,7 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { if me != nil { heap.Push(&ph, heapNode{mapEntry: me, itr: itr}) } else { - fmt.Printf("INVALID first map entry for %s", itr.fd.Name()) + fmt.Printf("NIL first map entry for %s", itr.fd.Name()) } } From c39a3518444a9ab783e8074e3704850dbe090d2a Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 16:42:39 -0700 Subject: [PATCH 08/10] Fix build breakages caused by importing Badger. --- dgraph/cmd/debug/wal.go | 2 +- raftwal/storage.go | 10 +++++----- xidmap/xidmap.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/debug/wal.go b/dgraph/cmd/debug/wal.go index 0c55daddefe..1627dc81e29 100644 --- a/dgraph/cmd/debug/wal.go +++ b/dgraph/cmd/debug/wal.go @@ -130,7 +130,7 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) { if err != nil { log.Fatalf("Unable to marshal entry: %+v. Error: %v", ent, err) } - if err := batch.Set(k, data, 0); err != nil { + if err := batch.Set(k, data); err != nil { log.Fatalf("Unable to set data: %+v", err) } default: diff --git a/raftwal/storage.go b/raftwal/storage.go index 5dba764e1fe..c17f307844f 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -342,7 +342,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s raftpb.Snapshot) e if err != nil { return errors.Wrapf(err, "wal.Store: While marshal snapshot") } - if err := batch.Set(w.snapshotKey(), data, 0); err != nil { + if err := batch.Set(w.snapshotKey(), data); err != nil { return err } @@ -351,7 +351,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s raftpb.Snapshot) e if err != nil { return err } - if err := batch.Set(w.EntryKey(e.Index), data, 0); err != nil { + if err := batch.Set(w.EntryKey(e.Index), data); err != nil { return err } @@ -378,7 +378,7 @@ func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st raftpb.HardState if err != nil { return errors.Wrapf(err, "wal.Store: While marshal hardstate") } - return batch.Set(w.HardStateKey(), data, 0) + return batch.Set(w.HardStateKey(), data) } // reset resets the entries. Used for testing. @@ -399,7 +399,7 @@ func (w *DiskStorage) reset(es []raftpb.Entry) error { return errors.Wrapf(err, "wal.Store: While marshal entry") } k := w.EntryKey(e.Index) - if err := batch.Set(k, data, 0); err != nil { + if err := batch.Set(k, data); err != nil { return err } } @@ -662,7 +662,7 @@ func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []raftpb.Entr if err != nil { return errors.Wrapf(err, "wal.Append: While marshal entry") } - if err := batch.Set(k, data, 0); err != nil { + if err := batch.Set(k, data); err != nil { return err } } diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 4df9ca4400f..7a20b8ca80a 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -171,7 +171,7 @@ func (m *XidMap) AssignUid(xid string) uint64 { if m.writer != nil { var uidBuf [8]byte binary.BigEndian.PutUint64(uidBuf[:], newUid) - if err := m.writer.Set([]byte(xid), uidBuf[:], 0); err != nil { + if err := m.writer.Set([]byte(xid), uidBuf[:]); err != nil { panic(err) } } From c044d2685477de77b80a481b2251406267c745cb Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 7 Jun 2019 16:48:41 -0700 Subject: [PATCH 09/10] Add a TODO --- dgraph/cmd/bulk/reduce.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 11ae92896c2..708b1c5f1d1 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -160,6 +160,9 @@ func (r *reducer) encodeAndWrite( streamId = atomic.AddUint32(&r.streamId, 1) preds[pk.Attr] = streamId } + // TODO: Having many stream ids can cause memory issues with StreamWriter. So, we + // should build a way in StreamWriter to indicate that the stream is over, so the + // table for that stream can be flushed and memory released. kv.StreamId = streamId } x.Check(writer.Write(list)) From eb808a919c2839cb426e77da250c2d86582f6cd5 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 10 Jun 2019 16:31:24 -0700 Subject: [PATCH 10/10] Address Martin's review --- dgraph/cmd/bulk/count_index.go | 2 +- dgraph/cmd/bulk/reduce.go | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index 2fdeb7b78bc..98572818db3 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -98,7 +98,7 @@ func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64 return bytes.Compare(list.Kv[i].Key, list.Kv[j].Key) < 0 }) if err := c.writer.Write(list); err != nil { - panic(err) + x.Check(err) } } diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 708b1c5f1d1..e822865b1d9 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -44,8 +44,8 @@ type reducer struct { } func (r *reducer) run() error { - shardDirs := shardDirs(r.opt.TmpDir) - x.AssertTrue(len(shardDirs) == r.opt.ReduceShards) + dirs := shardDirs(r.opt.TmpDir) + x.AssertTrue(len(dirs) == r.opt.ReduceShards) x.AssertTrue(len(r.opt.shardOutputDirs) == r.opt.ReduceShards) thr := y.NewThrottle(r.opt.NumReducers) @@ -56,7 +56,7 @@ func (r *reducer) run() error { go func(shardId int, db *badger.DB) { defer thr.Done(nil) - mapFiles := filenamesInTree(shardDirs[shardId]) + mapFiles := filenamesInTree(dirs[shardId]) var mapItrs []*mapIterator for _, mapFile := range mapFiles { itr := newMapIterator(mapFile) @@ -65,7 +65,7 @@ func (r *reducer) run() error { writer := db.NewStreamWriter() if err := writer.Prepare(); err != nil { - panic(err) + x.Check(err) } ci := &countIndexer{reducer: r, writer: writer} @@ -73,7 +73,7 @@ func (r *reducer) run() error { ci.wait() if err := writer.Flush(); err != nil { - panic(err) + x.Check(err) } for _, itr := range mapItrs { if err := itr.Close(); err != nil { @@ -210,6 +210,9 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { } keyChanged := !bytes.Equal(prevKey, me.Key) + // Note that the keys are coming in sorted order from the heap. So, if + // we see a new key, we should push out the number of entries we got + // for the current key, so the count index can register that. if keyChanged && plistLen > 0 { ci.addUid(prevKey, plistLen) plistLen = 0