From d6504159992923db1667741ab86902a3261fa96e Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 8 Jun 2018 20:16:34 -0700 Subject: [PATCH 1/6] Wrote a debug tool for Dgraph. Writing schema is the issue with bulk loader. --- dgraph/cmd/bulk/loader.go | 19 ++++++++++--------- dgraph/cmd/bulk/mapper.go | 3 ++- dgraph/cmd/bulk/merge_shards.go | 6 ++++-- dgraph/cmd/bulk/progress.go | 2 +- dgraph/cmd/bulk/run.go | 3 ++- dgraph/cmd/bulk/shuffle.go | 9 ++++----- dgraph/cmd/root.go | 3 ++- 7 files changed, 25 insertions(+), 20 deletions(-) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 054b9259583..ab6394abbf5 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -202,15 +202,6 @@ func (ld *loader) mapStage() { LRUSize: 1 << 19, }) - var mapperWg sync.WaitGroup - mapperWg.Add(len(ld.mappers)) - for _, m := range ld.mappers { - go func(m *mapper) { - m.run() - mapperWg.Done() - }(m) - } - var readers []*bufio.Reader for _, rdfFile := range findRDFFiles(ld.opt.RDFDir) { f, err := os.Open(rdfFile) @@ -230,6 +221,16 @@ func (ld *loader) mapStage() { os.Exit(1) } + var mapperWg sync.WaitGroup + mapperWg.Add(len(ld.mappers)) + for _, m := range ld.mappers { + go func(m *mapper) { + m.run() + mapperWg.Done() + }(m) + } + + // This is the main map loop. thr := x.NewThrottle(ld.opt.NumGoroutines) for _, r := range readers { thr.Start() diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 62e09797cc0..f6f2ec1ab02 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -70,6 +70,8 @@ func less(lhs, rhs *intern.MapEntry) bool { } func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { + defer m.shards[shardIdx].mu.Unlock() // Locked by caller. + buf := entriesBuf var entries []*intern.MapEntry for len(buf) > 0 { @@ -105,7 +107,6 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { ) x.Check(os.MkdirAll(filepath.Dir(filename), 0755)) x.Check(x.WriteFileSync(filename, entriesBuf, 0644)) - m.shards[shardIdx].mu.Unlock() // Locked by caller. } func (m *mapper) run() { diff --git a/dgraph/cmd/bulk/merge_shards.go b/dgraph/cmd/bulk/merge_shards.go index fc9670f383a..7129538db8b 100644 --- a/dgraph/cmd/bulk/merge_shards.go +++ b/dgraph/cmd/bulk/merge_shards.go @@ -31,8 +31,10 @@ func mergeMapShardsIntoReduceShards(opt options) { // until there are no more map shards left. Should be a good approximation. for _, shard := range mapShards { sortBySize(reduceShards) - x.Check(os.Rename(shard, filepath.Join( - reduceShards[len(reduceShards)-1], filepath.Base(shard)))) + reduceShard := filepath.Join( + reduceShards[len(reduceShards)-1], filepath.Base(shard)) + x.Printf("Shard %s -> Reduce %s\n", shard, reduceShard) + x.Check(os.Rename(shard, reduceShard)) } } diff --git a/dgraph/cmd/bulk/progress.go b/dgraph/cmd/bulk/progress.go index 26b1a49e0ed..2bacb434308 100644 --- a/dgraph/cmd/bulk/progress.go +++ b/dgraph/cmd/bulk/progress.go @@ -55,7 +55,7 @@ func (p *progress) setPhase(ph phase) { func (p *progress) report() { for { select { - case <-time.After(time.Second): + case <-time.After(10 * time.Second): p.reportOnce() case <-p.shutdown: p.shutdown <- struct{}{} diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 565652c2196..2fc533b6483 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -151,7 +151,8 @@ func run() { mergeMapShardsIntoReduceShards(opt) } loader.reduceStage() - loader.writeSchema() + // loader.writeSchema() // This is the issue. We need to find the predicates and only write + // schemas for those in the corresponding DB. loader.cleanup() } diff --git a/dgraph/cmd/bulk/shuffle.go b/dgraph/cmd/bulk/shuffle.go index 0e13ae4a977..98027603111 100644 --- a/dgraph/cmd/bulk/shuffle.go +++ b/dgraph/cmd/bulk/shuffle.go @@ -36,12 +36,12 @@ func (s *shuffler) run() { thr := x.NewThrottle(s.opt.NumShufflers) for i := 0; i < s.opt.ReduceShards; i++ { thr.Start() - go func(i int, db *badger.ManagedDB) { - mapFiles := filenamesInTree(shardDirs[i]) + go func(shardId int, db *badger.ManagedDB) { + mapFiles := filenamesInTree(shardDirs[shardId]) shuffleInputChs := make([]chan *intern.MapEntry, len(mapFiles)) for i, mapFile := range mapFiles { shuffleInputChs[i] = make(chan *intern.MapEntry, 1000) - go readMapOutput(mapFile, shuffleInputChs[i]) + go readMapOutput(shardId, mapFile, shuffleInputChs[i]) } ci := &countIndexer{state: s.state, db: db} @@ -66,7 +66,7 @@ func (s *shuffler) createBadger(i int) *badger.ManagedDB { return db } -func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) { +func readMapOutput(shard int, filename string, mapEntryCh chan<- *intern.MapEntry) { fd, err := os.Open(filename) x.Check(err) defer fd.Close() @@ -98,7 +98,6 @@ func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) { } func (s *shuffler) shufflePostings(mapEntryChs []chan *intern.MapEntry, ci *countIndexer) { - var ph postingHeap for _, ch := range mapEntryChs { heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index c052ad2aaa2..0b5f06728f8 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -12,6 +12,7 @@ import ( "os" "github.com/dgraph-io/dgraph/dgraph/cmd/bulk" + "github.com/dgraph-io/dgraph/dgraph/cmd/debug" "github.com/dgraph-io/dgraph/dgraph/cmd/live" "github.com/dgraph-io/dgraph/dgraph/cmd/server" "github.com/dgraph-io/dgraph/dgraph/cmd/version" @@ -58,7 +59,7 @@ func init() { rootConf.BindPFlags(RootCmd.PersistentFlags()) var subcommands = []*x.SubCommand{ - &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, + &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug, } for _, sc := range subcommands { RootCmd.AddCommand(sc.Cmd) From 29ae16aa2f619eac3f1a43bc9c7330dead34413b Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 12 Jun 2018 15:17:00 -0700 Subject: [PATCH 2/6] Add debug tool. --- dgraph/cmd/debug/run.go | 89 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 dgraph/cmd/debug/run.go diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go new file mode 100644 index 00000000000..486bc48c105 --- /dev/null +++ b/dgraph/cmd/debug/run.go @@ -0,0 +1,89 @@ +/* + * Copyright 2017-2018 Dgraph Labs, Inc. + * + * This file is available under the Apache License, Version 2.0, + * with the Commons Clause restriction. + */ + +package debug + +import ( + "fmt" + "log" + "math" + "sort" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/dgraph/x" + "github.com/spf13/cobra" +) + +var Debug x.SubCommand + +func init() { + Debug.Cmd = &cobra.Command{ + Use: "debug", + Short: "Debug Dgraph instance", + Run: func(cmd *cobra.Command, args []string) { + run() + }, + } + + flag := Debug.Cmd.Flags() + flag.StringP("postings", "p", "", "Directory where posting lists are stored.") + flag.BoolP("predicates", "s", false, "List all the predicates.") + flag.BoolP("readonly", "o", true, "Open in read only mode.") +} + +func run() { + opts := badger.DefaultOptions + opts.Dir = Debug.Conf.GetString("postings") + opts.ValueDir = Debug.Conf.GetString("postings") + opts.TableLoadingMode = options.MemoryMap + opts.ReadOnly = Debug.Conf.GetBool("readonly") + + x.AssertTruef(len(opts.Dir) > 0, "No posting dir specified.") + fmt.Printf("Opening DB: %s\n", opts.Dir) + db, err := badger.OpenManaged(opts) + x.Check(err) + defer db.Close() + + if Debug.Conf.GetBool("predicates") { + txn := db.NewTransactionAt(math.MaxUint64, false) + defer txn.Discard() + + iopts := badger.DefaultIteratorOptions + iopts.PrefetchValues = false + itr := txn.NewIterator(iopts) + defer itr.Close() + + var loop int + m := make(map[string]int) + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + pk := x.Parse(item.Key()) + m[pk.Attr] += 1 + loop++ + } + + type C struct { + pred string + count int + } + + var counts []C + for pred, count := range m { + counts = append(counts, C{pred, count}) + } + sort.Slice(counts, func(i, j int) bool { + return counts[i].count > counts[j].count + }) + for _, c := range counts { + fmt.Printf("Count: %10d Predicate: %-20s\n", c.count, c.pred) + } + fmt.Printf("Found %d keys\n", loop) + return + } + log.Fatalln("Please provide a valid option for diagnosis.") +} From 9c0bae62d657cf6c7e2b4646e43732f6bf460758 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 19 Jun 2018 17:52:35 -0700 Subject: [PATCH 3/6] Working solution for writing schema only for the predicates that the DB holds. --- dgraph/cmd/bulk/run.go | 3 +-- dgraph/cmd/bulk/schema.go | 37 ++++++++++++++++++++++++++++-- dgraph/cmd/debug/run.go | 48 +++++++++++++++++++++++++++++++++------ 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 2fc533b6483..565652c2196 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -151,8 +151,7 @@ func run() { mergeMapShardsIntoReduceShards(opt) } loader.reduceStage() - // loader.writeSchema() // This is the issue. We need to find the predicates and only write - // schemas for those in the corresponding DB. + loader.writeSchema() loader.cleanup() } diff --git a/dgraph/cmd/bulk/schema.go b/dgraph/cmd/bulk/schema.go index 7af839f9da4..58a8c5db371 100644 --- a/dgraph/cmd/bulk/schema.go +++ b/dgraph/cmd/bulk/schema.go @@ -10,6 +10,7 @@ package bulk import ( "fmt" "log" + "math" "sync" "github.com/dgraph-io/badger" @@ -82,11 +83,43 @@ func (s *schemaStore) validateType(de *intern.DirectedEdge, objectIsUID bool) { } } +func (s *schemaStore) getPredicates(db *badger.ManagedDB) []string { + txn := db.NewTransactionAt(math.MaxUint64, false) + defer txn.Discard() + + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + itr := txn.NewIterator(opts) + defer itr.Close() + + m := make(map[string]struct{}) + for itr.Rewind(); itr.Valid(); { + item := itr.Item() + pk := x.Parse(item.Key()) + m[pk.Attr] = struct{}{} + itr.Seek(pk.SkipPredicate()) + continue + } + + var preds []string + for pred := range m { + preds = append(preds, pred) + } + return preds +} + func (s *schemaStore) write(db *badger.ManagedDB) { // Write schema always at timestamp 1, s.state.writeTs may not be equal to 1 // if bulk loader was restarted or other similar scenarios. - txn := db.NewTransactionAt(1, true) - for pred, sch := range s.m { + preds := s.getPredicates(db) + + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + for _, pred := range preds { + sch, ok := s.m[pred] + if !ok { + continue + } k := x.SchemaKey(pred) v, err := sch.Marshal() x.Check(err) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 486bc48c105..905569041e7 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -36,6 +36,15 @@ func init() { flag.BoolP("readonly", "o", true, "Open in read only mode.") } +type Stats struct { + Data int + Index int + Schema int + Reverse int + Count int + Total int +} + func run() { opts := badger.DefaultOptions opts.Dir = Debug.Conf.GetString("postings") @@ -59,28 +68,53 @@ func run() { defer itr.Close() var loop int - m := make(map[string]int) + m := make(map[string]*Stats) for itr.Rewind(); itr.Valid(); itr.Next() { item := itr.Item() pk := x.Parse(item.Key()) - m[pk.Attr] += 1 + stats, ok := m[pk.Attr] + if !ok { + stats = new(Stats) + m[pk.Attr] = stats + } + stats.Total += 1 + // Don't use a switch case here. Because multiple of these can be true. In particular, + // IsSchema can be true alongside IsData. + if pk.IsData() { + stats.Data += 1 + } + if pk.IsIndex() { + stats.Index += 1 + } + if pk.IsCount() { + stats.Count += 1 + } + if pk.IsSchema() { + stats.Schema += 1 + } + if pk.IsReverse() { + stats.Reverse += 1 + } loop++ } type C struct { pred string - count int + stats *Stats } var counts []C - for pred, count := range m { - counts = append(counts, C{pred, count}) + for pred, stats := range m { + counts = append(counts, C{pred, stats}) } sort.Slice(counts, func(i, j int) bool { - return counts[i].count > counts[j].count + return counts[i].stats.Total > counts[j].stats.Total }) for _, c := range counts { - fmt.Printf("Count: %10d Predicate: %-20s\n", c.count, c.pred) + st := c.stats + fmt.Printf("Total: %-8d. Predicate: %-20s\n", st.Total, c.pred) + fmt.Printf(" Data: %d Index: %d Reverse: %d Schema: %d Count: %d Predicate: %s\n\n", + st.Data, st.Index, st.Reverse, st.Schema, st.Count, c.pred) } fmt.Printf("Found %d keys\n", loop) return From 891009b1f368c712b2f7229177f12ca738a0216d Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 19 Jun 2018 19:12:38 -0700 Subject: [PATCH 4/6] Put _predicate_ in the right shard. --- dgraph/cmd/bulk/mapper.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index f6f2ec1ab02..a045c267280 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -122,7 +122,7 @@ func (m *mapper) run() { } rdf = strings.TrimSpace(rdf) - x.Check(m.parseRDF(rdf)) + x.Check(m.processRDF(rdf)) atomic.AddInt64(&m.prog.rdfCount, 1) for i := range m.shards { sh := &m.shards[i] @@ -163,7 +163,7 @@ func (m *mapper) addMapEntry(key []byte, p *intern.Posting, shard int) { x.Check(err) } -func (m *mapper) parseRDF(rdfLine string) error { +func (m *mapper) processRDF(rdfLine string) error { nq, err := parseNQuad(rdfLine) if err != nil { if err == rdf.ErrEmpty { @@ -200,14 +200,14 @@ func (m *mapper) processNQuad(nq gql.NQuad) { key = x.ReverseKey(nq.Predicate, oid) m.addMapEntry(key, rev, shard) } + m.addIndexMapEntries(nq, de) if m.opt.ExpandEdges { + shard := m.state.shards.shardFor("_predicate_") key = x.DataKey("_predicate_", sid) pp := m.createPredicatePosting(nq.Predicate) m.addMapEntry(key, pp, shard) } - - m.addIndexMapEntries(nq, de) } func (m *mapper) lookupUid(xid string) uint64 { @@ -288,9 +288,7 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *intern.DirectedEdge) { } sch := m.schema.getSchema(nq.GetPredicate()) - for _, tokerName := range sch.GetTokenizer() { - // Find tokeniser. toker, ok := tok.GetTokenizer(tokerName) if !ok { From eaf335d616608053bd97f12c4e03f64dfa0f13b4 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 19 Jun 2018 19:20:24 -0700 Subject: [PATCH 5/6] Revert back change. --- dgraph/cmd/bulk/shuffle.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dgraph/cmd/bulk/shuffle.go b/dgraph/cmd/bulk/shuffle.go index 98027603111..f248e1fa29b 100644 --- a/dgraph/cmd/bulk/shuffle.go +++ b/dgraph/cmd/bulk/shuffle.go @@ -41,7 +41,7 @@ func (s *shuffler) run() { shuffleInputChs := make([]chan *intern.MapEntry, len(mapFiles)) for i, mapFile := range mapFiles { shuffleInputChs[i] = make(chan *intern.MapEntry, 1000) - go readMapOutput(shardId, mapFile, shuffleInputChs[i]) + go readMapOutput(mapFile, shuffleInputChs[i]) } ci := &countIndexer{state: s.state, db: db} @@ -66,7 +66,7 @@ func (s *shuffler) createBadger(i int) *badger.ManagedDB { return db } -func readMapOutput(shard int, filename string, mapEntryCh chan<- *intern.MapEntry) { +func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) { fd, err := os.Open(filename) x.Check(err) defer fd.Close() From 86177198a6a3e23458f6376a0a0a28ffc0154cc1 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 19 Jun 2018 19:26:58 -0700 Subject: [PATCH 6/6] Revert change --- dgraph/cmd/bulk/progress.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dgraph/cmd/bulk/progress.go b/dgraph/cmd/bulk/progress.go index 2bacb434308..26b1a49e0ed 100644 --- a/dgraph/cmd/bulk/progress.go +++ b/dgraph/cmd/bulk/progress.go @@ -55,7 +55,7 @@ func (p *progress) setPhase(ph phase) { func (p *progress) report() { for { select { - case <-time.After(10 * time.Second): + case <-time.After(time.Second): p.reportOnce() case <-p.shutdown: p.shutdown <- struct{}{}