Skip to content

Commit

Permalink
Fix bugs in bulk loader. Add Dgraph debug tool. (#2449)
Browse files Browse the repository at this point in the history
When building multiple shards, we want each predicate to only lie to one shard, and nowhere else. Bulk loader was doing that, but then adding the schema for all predicates on all shards, which causes confusion about ownership of the predicates. This changes that to only add schema for the predicates that the shard holds.

Similarly, when outputting _predicate_ edge, the shard being used was the one corresponding to the original predicate. Instead, we should use the shard corresponding to _predicate_.

This PR fixes #2129 .

Added a new debug tool which can iterate over posting store and spit out stats per predicate. Useful for debugging.
  • Loading branch information
manishrjain authored Jun 20, 2018
1 parent 1a2fa20 commit 1c602b0
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 24 deletions.
19 changes: 10 additions & 9 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,6 @@ func (ld *loader) mapStage() {
LRUSize: 1 << 19,
})

var mapperWg sync.WaitGroup
mapperWg.Add(len(ld.mappers))
for _, m := range ld.mappers {
go func(m *mapper) {
m.run()
mapperWg.Done()
}(m)
}

var readers []*bufio.Reader
for _, rdfFile := range findRDFFiles(ld.opt.RDFDir) {
f, err := os.Open(rdfFile)
Expand All @@ -230,6 +221,16 @@ func (ld *loader) mapStage() {
os.Exit(1)
}

var mapperWg sync.WaitGroup
mapperWg.Add(len(ld.mappers))
for _, m := range ld.mappers {
go func(m *mapper) {
m.run()
mapperWg.Done()
}(m)
}

// This is the main map loop.
thr := x.NewThrottle(ld.opt.NumGoroutines)
for _, r := range readers {
thr.Start()
Expand Down
13 changes: 6 additions & 7 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func less(lhs, rhs *intern.MapEntry) bool {
}

func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.

buf := entriesBuf
var entries []*intern.MapEntry
for len(buf) > 0 {
Expand Down Expand Up @@ -105,7 +107,6 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
m.shards[shardIdx].mu.Unlock() // Locked by caller.
}

func (m *mapper) run() {
Expand All @@ -121,7 +122,7 @@ func (m *mapper) run() {
}
rdf = strings.TrimSpace(rdf)

x.Check(m.parseRDF(rdf))
x.Check(m.processRDF(rdf))
atomic.AddInt64(&m.prog.rdfCount, 1)
for i := range m.shards {
sh := &m.shards[i]
Expand Down Expand Up @@ -162,7 +163,7 @@ func (m *mapper) addMapEntry(key []byte, p *intern.Posting, shard int) {
x.Check(err)
}

func (m *mapper) parseRDF(rdfLine string) error {
func (m *mapper) processRDF(rdfLine string) error {
nq, err := parseNQuad(rdfLine)
if err != nil {
if err == rdf.ErrEmpty {
Expand Down Expand Up @@ -199,14 +200,14 @@ func (m *mapper) processNQuad(nq gql.NQuad) {
key = x.ReverseKey(nq.Predicate, oid)
m.addMapEntry(key, rev, shard)
}
m.addIndexMapEntries(nq, de)

if m.opt.ExpandEdges {
shard := m.state.shards.shardFor("_predicate_")
key = x.DataKey("_predicate_", sid)
pp := m.createPredicatePosting(nq.Predicate)
m.addMapEntry(key, pp, shard)
}

m.addIndexMapEntries(nq, de)
}

func (m *mapper) lookupUid(xid string) uint64 {
Expand Down Expand Up @@ -287,9 +288,7 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *intern.DirectedEdge) {
}

sch := m.schema.getSchema(nq.GetPredicate())

for _, tokerName := range sch.GetTokenizer() {

// Find tokeniser.
toker, ok := tok.GetTokenizer(tokerName)
if !ok {
Expand Down
6 changes: 4 additions & 2 deletions dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ func mergeMapShardsIntoReduceShards(opt options) {
// until there are no more map shards left. Should be a good approximation.
for _, shard := range mapShards {
sortBySize(reduceShards)
x.Check(os.Rename(shard, filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))))
reduceShard := filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))
x.Printf("Shard %s -> Reduce %s\n", shard, reduceShard)
x.Check(os.Rename(shard, reduceShard))
}
}

Expand Down
37 changes: 35 additions & 2 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package bulk
import (
"fmt"
"log"
"math"
"sync"

"github.com/dgraph-io/badger"
Expand Down Expand Up @@ -82,11 +83,43 @@ func (s *schemaStore) validateType(de *intern.DirectedEdge, objectIsUID bool) {
}
}

func (s *schemaStore) getPredicates(db *badger.ManagedDB) []string {
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
itr := txn.NewIterator(opts)
defer itr.Close()

m := make(map[string]struct{})
for itr.Rewind(); itr.Valid(); {
item := itr.Item()
pk := x.Parse(item.Key())
m[pk.Attr] = struct{}{}
itr.Seek(pk.SkipPredicate())
continue
}

var preds []string
for pred := range m {
preds = append(preds, pred)
}
return preds
}

func (s *schemaStore) write(db *badger.ManagedDB) {
// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.
txn := db.NewTransactionAt(1, true)
for pred, sch := range s.m {
preds := s.getPredicates(db)

txn := db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()
for _, pred := range preds {
sch, ok := s.m[pred]
if !ok {
continue
}
k := x.SchemaKey(pred)
v, err := sch.Marshal()
x.Check(err)
Expand Down
5 changes: 2 additions & 3 deletions dgraph/cmd/bulk/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func (s *shuffler) run() {
thr := x.NewThrottle(s.opt.NumShufflers)
for i := 0; i < s.opt.ReduceShards; i++ {
thr.Start()
go func(i int, db *badger.ManagedDB) {
mapFiles := filenamesInTree(shardDirs[i])
go func(shardId int, db *badger.ManagedDB) {
mapFiles := filenamesInTree(shardDirs[shardId])
shuffleInputChs := make([]chan *intern.MapEntry, len(mapFiles))
for i, mapFile := range mapFiles {
shuffleInputChs[i] = make(chan *intern.MapEntry, 1000)
Expand Down Expand Up @@ -98,7 +98,6 @@ func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) {
}

func (s *shuffler) shufflePostings(mapEntryChs []chan *intern.MapEntry, ci *countIndexer) {

var ph postingHeap
for _, ch := range mapEntryChs {
heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch})
Expand Down
123 changes: 123 additions & 0 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2017-2018 Dgraph Labs, Inc.
*
* This file is available under the Apache License, Version 2.0,
* with the Commons Clause restriction.
*/

package debug

import (
"fmt"
"log"
"math"
"sort"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
)

var Debug x.SubCommand

func init() {
Debug.Cmd = &cobra.Command{
Use: "debug",
Short: "Debug Dgraph instance",
Run: func(cmd *cobra.Command, args []string) {
run()
},
}

flag := Debug.Cmd.Flags()
flag.StringP("postings", "p", "", "Directory where posting lists are stored.")
flag.BoolP("predicates", "s", false, "List all the predicates.")
flag.BoolP("readonly", "o", true, "Open in read only mode.")
}

type Stats struct {
Data int
Index int
Schema int
Reverse int
Count int
Total int
}

func run() {
opts := badger.DefaultOptions
opts.Dir = Debug.Conf.GetString("postings")
opts.ValueDir = Debug.Conf.GetString("postings")
opts.TableLoadingMode = options.MemoryMap
opts.ReadOnly = Debug.Conf.GetBool("readonly")

x.AssertTruef(len(opts.Dir) > 0, "No posting dir specified.")
fmt.Printf("Opening DB: %s\n", opts.Dir)
db, err := badger.OpenManaged(opts)
x.Check(err)
defer db.Close()

if Debug.Conf.GetBool("predicates") {
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

iopts := badger.DefaultIteratorOptions
iopts.PrefetchValues = false
itr := txn.NewIterator(iopts)
defer itr.Close()

var loop int
m := make(map[string]*Stats)
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
pk := x.Parse(item.Key())
stats, ok := m[pk.Attr]
if !ok {
stats = new(Stats)
m[pk.Attr] = stats
}
stats.Total += 1
// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsData() {
stats.Data += 1
}
if pk.IsIndex() {
stats.Index += 1
}
if pk.IsCount() {
stats.Count += 1
}
if pk.IsSchema() {
stats.Schema += 1
}
if pk.IsReverse() {
stats.Reverse += 1
}
loop++
}

type C struct {
pred string
stats *Stats
}

var counts []C
for pred, stats := range m {
counts = append(counts, C{pred, stats})
}
sort.Slice(counts, func(i, j int) bool {
return counts[i].stats.Total > counts[j].stats.Total
})
for _, c := range counts {
st := c.stats
fmt.Printf("Total: %-8d. Predicate: %-20s\n", st.Total, c.pred)
fmt.Printf(" Data: %d Index: %d Reverse: %d Schema: %d Count: %d Predicate: %s\n\n",
st.Data, st.Index, st.Reverse, st.Schema, st.Count, c.pred)
}
fmt.Printf("Found %d keys\n", loop)
return
}
log.Fatalln("Please provide a valid option for diagnosis.")
}
3 changes: 2 additions & 1 deletion dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

"github.com/dgraph-io/dgraph/dgraph/cmd/bulk"
"github.com/dgraph-io/dgraph/dgraph/cmd/debug"
"github.com/dgraph-io/dgraph/dgraph/cmd/live"
"github.com/dgraph-io/dgraph/dgraph/cmd/server"
"github.com/dgraph-io/dgraph/dgraph/cmd/version"
Expand Down Expand Up @@ -58,7 +59,7 @@ func init() {
rootConf.BindPFlags(RootCmd.PersistentFlags())

var subcommands = []*x.SubCommand{
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version,
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug,
}
for _, sc := range subcommands {
RootCmd.AddCommand(sc.Cmd)
Expand Down

0 comments on commit 1c602b0

Please sign in to comment.