Skip to content

Commit

Permalink
Use StreamWriter in bulk loader (hypermodeinc#3542)
Browse files Browse the repository at this point in the history
This PR switched transaction based writes to Badger to StreamWriter and brings in Badger master into vendor.

This PR also refactors bulk loader code as follows:

- Simplify shuffler and reducer code and merge them into one, i.e. reducer.
- Remove shuffler.go file.
- Remove metrics.go file.
- The channel based heap merge was expensive. Switched that with a simple map entries iterator.

With these changes, the 21M dataset now takes 2 mins to load from the original 3 mins.


Changes:
* Simplified shuffler and reducer code. But, encountered an issue where key sorted order changes due to version append in Badger.
* Working code after StreamWriter integration.
* Vendor Badger in, because it contains fixes to StreamWriter.
* Fix build breakages caused by importing Badger.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 38650a6 commit 594df67
Show file tree
Hide file tree
Showing 24 changed files with 428 additions and 358 deletions.
29 changes: 21 additions & 8 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package bulk

import (
"bytes"
"sort"
"sync"
"sync/atomic"

"github.com/dgraph-io/badger"
bpb "github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
Expand All @@ -34,8 +37,8 @@ type current struct {
}

type countIndexer struct {
*state
db *badger.DB
*reducer
writer *badger.StreamWriter
cur current
counts map[int][]uint64
wg sync.WaitGroup
Expand Down Expand Up @@ -72,21 +75,31 @@ func (c *countIndexer) addUid(rawKey []byte, count int) {
}

func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64) {
writer := posting.NewTxnWriter(c.db)
defer c.wg.Done()

streamId := atomic.AddUint32(&c.streamId, 1)
list := &bpb.KVList{}
for count, uids := range counts {
sort.Slice(uids, func(i, j int) bool { return uids[i] < uids[j] })

var pl pb.PostingList
pl.Pack = codec.Encode(uids, 256)
data, err := pl.Marshal()
x.Check(err)
x.Check(writer.SetAt(
x.CountKey(pred, uint32(count), rev),
data, posting.BitCompletePosting, c.state.writeTs))
list.Kv = append(list.Kv, &bpb.KV{
Key: x.CountKey(pred, uint32(count), rev),
Value: data,
Meta: []byte{posting.BitCompletePosting},
Version: c.state.writeTs,
StreamId: streamId,
})
}
sort.Slice(list.Kv, func(i, j int) bool {
return bytes.Compare(list.Kv[i].Key, list.Kv[j].Key) < 0
})
if err := c.writer.Write(list); err != nil {
x.Check(err)
}
x.Check(writer.Flush())
c.wg.Done()
}

func (c *countIndexer) wait() {
Expand Down
18 changes: 4 additions & 14 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type options struct {
MapBufSize int64
SkipMapPhase bool
CleanupTmp bool
NumShufflers int
NumReducers int
Version bool
StoreXids bool
ZeroAddr string
Expand Down Expand Up @@ -220,25 +220,15 @@ func (ld *loader) mapStage() {
}

type shuffleOutput struct {
db *badger.DB
writer *badger.StreamWriter
mapEntries []*pb.MapEntry
}

func (ld *loader) reduceStage() {
ld.prog.setPhase(reducePhase)

shuffleOutputCh := make(chan shuffleOutput, 100)
go func() {
shuf := shuffler{state: ld.state, output: shuffleOutputCh}
shuf.run()
}()

redu := reducer{
state: ld.state,
input: shuffleOutputCh,
writesThr: x.NewThrottle(100),
}
redu.run()
r := reducer{state: ld.state}
x.Check(r.run())
}

func (ld *loader) writeSchema() {
Expand Down
25 changes: 0 additions & 25 deletions dgraph/cmd/bulk/metrics.go

This file was deleted.

Loading

0 comments on commit 594df67

Please sign in to comment.