Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use StreamWriter in bulk loader #3542

Merged
merged 11 commits into from
Jun 10, 2019
29 changes: 21 additions & 8 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package bulk

import (
"bytes"
"sort"
"sync"
"sync/atomic"

"github.com/dgraph-io/badger"
bpb "github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
Expand All @@ -34,8 +37,8 @@ type current struct {
}

type countIndexer struct {
*state
db *badger.DB
*reducer
writer *badger.StreamWriter
cur current
counts map[int][]uint64
wg sync.WaitGroup
Expand Down Expand Up @@ -72,21 +75,31 @@ func (c *countIndexer) addUid(rawKey []byte, count int) {
}

func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64) {
writer := posting.NewTxnWriter(c.db)
defer c.wg.Done()

streamId := atomic.AddUint32(&c.streamId, 1)
manishrjain marked this conversation as resolved.
Show resolved Hide resolved
list := &bpb.KVList{}
for count, uids := range counts {
sort.Slice(uids, func(i, j int) bool { return uids[i] < uids[j] })

var pl pb.PostingList
pl.Pack = codec.Encode(uids, 256)
data, err := pl.Marshal()
x.Check(err)
x.Check(writer.SetAt(
x.CountKey(pred, uint32(count), rev),
data, posting.BitCompletePosting, c.state.writeTs))
list.Kv = append(list.Kv, &bpb.KV{
Key: x.CountKey(pred, uint32(count), rev),
Value: data,
Meta: []byte{posting.BitCompletePosting},
Version: c.state.writeTs,
StreamId: streamId,
})
}
sort.Slice(list.Kv, func(i, j int) bool {
return bytes.Compare(list.Kv[i].Key, list.Kv[j].Key) < 0
})
if err := c.writer.Write(list); err != nil {
panic(err)
}
x.Check(writer.Flush())
c.wg.Done()
}

func (c *countIndexer) wait() {
Expand Down
18 changes: 4 additions & 14 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type options struct {
MapBufSize int64
SkipMapPhase bool
CleanupTmp bool
NumShufflers int
NumReducers int
Version bool
StoreXids bool
ZeroAddr string
Expand Down Expand Up @@ -220,25 +220,15 @@ func (ld *loader) mapStage() {
}

type shuffleOutput struct {
db *badger.DB
writer *badger.StreamWriter
mapEntries []*pb.MapEntry
}

func (ld *loader) reduceStage() {
ld.prog.setPhase(reducePhase)

shuffleOutputCh := make(chan shuffleOutput, 100)
go func() {
shuf := shuffler{state: ld.state, output: shuffleOutputCh}
shuf.run()
}()

redu := reducer{
state: ld.state,
input: shuffleOutputCh,
writesThr: x.NewThrottle(100),
}
redu.run()
r := reducer{state: ld.state}
x.Check(r.run())
}

func (ld *loader) writeSchema() {
Expand Down
25 changes: 0 additions & 25 deletions dgraph/cmd/bulk/metrics.go

This file was deleted.

Loading