Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in bulk loader. Add Dgraph debug tool. #2449

Merged
merged 7 commits into from
Jun 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,6 @@ func (ld *loader) mapStage() {
LRUSize: 1 << 19,
})

var mapperWg sync.WaitGroup
mapperWg.Add(len(ld.mappers))
for _, m := range ld.mappers {
go func(m *mapper) {
m.run()
mapperWg.Done()
}(m)
}

var readers []*bufio.Reader
for _, rdfFile := range findRDFFiles(ld.opt.RDFDir) {
f, err := os.Open(rdfFile)
Expand All @@ -230,6 +221,16 @@ func (ld *loader) mapStage() {
os.Exit(1)
}

var mapperWg sync.WaitGroup
mapperWg.Add(len(ld.mappers))
for _, m := range ld.mappers {
go func(m *mapper) {
m.run()
mapperWg.Done()
}(m)
}

// This is the main map loop.
thr := x.NewThrottle(ld.opt.NumGoroutines)
for _, r := range readers {
thr.Start()
Expand Down
13 changes: 6 additions & 7 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func less(lhs, rhs *intern.MapEntry) bool {
}

func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.

buf := entriesBuf
var entries []*intern.MapEntry
for len(buf) > 0 {
Expand Down Expand Up @@ -105,7 +107,6 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
m.shards[shardIdx].mu.Unlock() // Locked by caller.
}

func (m *mapper) run() {
Expand All @@ -121,7 +122,7 @@ func (m *mapper) run() {
}
rdf = strings.TrimSpace(rdf)

x.Check(m.parseRDF(rdf))
x.Check(m.processRDF(rdf))
atomic.AddInt64(&m.prog.rdfCount, 1)
for i := range m.shards {
sh := &m.shards[i]
Expand Down Expand Up @@ -162,7 +163,7 @@ func (m *mapper) addMapEntry(key []byte, p *intern.Posting, shard int) {
x.Check(err)
}

func (m *mapper) parseRDF(rdfLine string) error {
func (m *mapper) processRDF(rdfLine string) error {
nq, err := parseNQuad(rdfLine)
if err != nil {
if err == rdf.ErrEmpty {
Expand Down Expand Up @@ -199,14 +200,14 @@ func (m *mapper) processNQuad(nq gql.NQuad) {
key = x.ReverseKey(nq.Predicate, oid)
m.addMapEntry(key, rev, shard)
}
m.addIndexMapEntries(nq, de)

if m.opt.ExpandEdges {
shard := m.state.shards.shardFor("_predicate_")
key = x.DataKey("_predicate_", sid)
pp := m.createPredicatePosting(nq.Predicate)
m.addMapEntry(key, pp, shard)
}

m.addIndexMapEntries(nq, de)
}

func (m *mapper) lookupUid(xid string) uint64 {
Expand Down Expand Up @@ -287,9 +288,7 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *intern.DirectedEdge) {
}

sch := m.schema.getSchema(nq.GetPredicate())

for _, tokerName := range sch.GetTokenizer() {

// Find tokeniser.
toker, ok := tok.GetTokenizer(tokerName)
if !ok {
Expand Down
6 changes: 4 additions & 2 deletions dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ func mergeMapShardsIntoReduceShards(opt options) {
// until there are no more map shards left. Should be a good approximation.
for _, shard := range mapShards {
sortBySize(reduceShards)
x.Check(os.Rename(shard, filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))))
reduceShard := filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))
x.Printf("Shard %s -> Reduce %s\n", shard, reduceShard)
x.Check(os.Rename(shard, reduceShard))
}
}

Expand Down
37 changes: 35 additions & 2 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package bulk
import (
"fmt"
"log"
"math"
"sync"

"github.com/dgraph-io/badger"
Expand Down Expand Up @@ -82,11 +83,43 @@ func (s *schemaStore) validateType(de *intern.DirectedEdge, objectIsUID bool) {
}
}

func (s *schemaStore) getPredicates(db *badger.ManagedDB) []string {
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
itr := txn.NewIterator(opts)
defer itr.Close()

m := make(map[string]struct{})
for itr.Rewind(); itr.Valid(); {
item := itr.Item()
pk := x.Parse(item.Key())
m[pk.Attr] = struct{}{}
itr.Seek(pk.SkipPredicate())
continue
}

var preds []string
for pred := range m {
preds = append(preds, pred)
}
return preds
}

func (s *schemaStore) write(db *badger.ManagedDB) {
// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.
txn := db.NewTransactionAt(1, true)
for pred, sch := range s.m {
preds := s.getPredicates(db)

txn := db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()
for _, pred := range preds {
sch, ok := s.m[pred]
if !ok {
continue
}
k := x.SchemaKey(pred)
v, err := sch.Marshal()
x.Check(err)
Expand Down
5 changes: 2 additions & 3 deletions dgraph/cmd/bulk/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func (s *shuffler) run() {
thr := x.NewThrottle(s.opt.NumShufflers)
for i := 0; i < s.opt.ReduceShards; i++ {
thr.Start()
go func(i int, db *badger.ManagedDB) {
mapFiles := filenamesInTree(shardDirs[i])
go func(shardId int, db *badger.ManagedDB) {
mapFiles := filenamesInTree(shardDirs[shardId])
shuffleInputChs := make([]chan *intern.MapEntry, len(mapFiles))
for i, mapFile := range mapFiles {
shuffleInputChs[i] = make(chan *intern.MapEntry, 1000)
Expand Down Expand Up @@ -98,7 +98,6 @@ func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) {
}

func (s *shuffler) shufflePostings(mapEntryChs []chan *intern.MapEntry, ci *countIndexer) {

var ph postingHeap
for _, ch := range mapEntryChs {
heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch})
Expand Down
123 changes: 123 additions & 0 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2017-2018 Dgraph Labs, Inc.
*
* This file is available under the Apache License, Version 2.0,
* with the Commons Clause restriction.
*/

package debug

import (
"fmt"
"log"
"math"
"sort"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
)

var Debug x.SubCommand

func init() {
Debug.Cmd = &cobra.Command{
Use: "debug",
Short: "Debug Dgraph instance",
Run: func(cmd *cobra.Command, args []string) {
run()
},
}

flag := Debug.Cmd.Flags()
flag.StringP("postings", "p", "", "Directory where posting lists are stored.")
flag.BoolP("predicates", "s", false, "List all the predicates.")
flag.BoolP("readonly", "o", true, "Open in read only mode.")
}

type Stats struct {
Data int
Index int
Schema int
Reverse int
Count int
Total int
}

func run() {
opts := badger.DefaultOptions
opts.Dir = Debug.Conf.GetString("postings")
opts.ValueDir = Debug.Conf.GetString("postings")
opts.TableLoadingMode = options.MemoryMap
opts.ReadOnly = Debug.Conf.GetBool("readonly")

x.AssertTruef(len(opts.Dir) > 0, "No posting dir specified.")
fmt.Printf("Opening DB: %s\n", opts.Dir)
db, err := badger.OpenManaged(opts)
x.Check(err)
defer db.Close()

if Debug.Conf.GetBool("predicates") {
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

iopts := badger.DefaultIteratorOptions
iopts.PrefetchValues = false
itr := txn.NewIterator(iopts)
defer itr.Close()

var loop int
m := make(map[string]*Stats)
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
pk := x.Parse(item.Key())
stats, ok := m[pk.Attr]
if !ok {
stats = new(Stats)
m[pk.Attr] = stats
}
stats.Total += 1
// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsData() {
stats.Data += 1
}
if pk.IsIndex() {
stats.Index += 1
}
if pk.IsCount() {
stats.Count += 1
}
if pk.IsSchema() {
stats.Schema += 1
}
if pk.IsReverse() {
stats.Reverse += 1
}
loop++
}

type C struct {
pred string
stats *Stats
}

var counts []C
for pred, stats := range m {
counts = append(counts, C{pred, stats})
}
sort.Slice(counts, func(i, j int) bool {
return counts[i].stats.Total > counts[j].stats.Total
})
for _, c := range counts {
st := c.stats
fmt.Printf("Total: %-8d. Predicate: %-20s\n", st.Total, c.pred)
fmt.Printf(" Data: %d Index: %d Reverse: %d Schema: %d Count: %d Predicate: %s\n\n",
st.Data, st.Index, st.Reverse, st.Schema, st.Count, c.pred)
}
fmt.Printf("Found %d keys\n", loop)
return
}
log.Fatalln("Please provide a valid option for diagnosis.")
}
3 changes: 2 additions & 1 deletion dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

"github.com/dgraph-io/dgraph/dgraph/cmd/bulk"
"github.com/dgraph-io/dgraph/dgraph/cmd/debug"
"github.com/dgraph-io/dgraph/dgraph/cmd/live"
"github.com/dgraph-io/dgraph/dgraph/cmd/server"
"github.com/dgraph-io/dgraph/dgraph/cmd/version"
Expand Down Expand Up @@ -58,7 +59,7 @@ func init() {
rootConf.BindPFlags(RootCmd.PersistentFlags())

var subcommands = []*x.SubCommand{
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version,
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug,
}
for _, sc := range subcommands {
RootCmd.AddCommand(sc.Cmd)
Expand Down