diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 054b9259583..ab6394abbf5 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -202,15 +202,6 @@ func (ld *loader) mapStage() { LRUSize: 1 << 19, }) - var mapperWg sync.WaitGroup - mapperWg.Add(len(ld.mappers)) - for _, m := range ld.mappers { - go func(m *mapper) { - m.run() - mapperWg.Done() - }(m) - } - var readers []*bufio.Reader for _, rdfFile := range findRDFFiles(ld.opt.RDFDir) { f, err := os.Open(rdfFile) @@ -230,6 +221,16 @@ func (ld *loader) mapStage() { os.Exit(1) } + var mapperWg sync.WaitGroup + mapperWg.Add(len(ld.mappers)) + for _, m := range ld.mappers { + go func(m *mapper) { + m.run() + mapperWg.Done() + }(m) + } + + // This is the main map loop. thr := x.NewThrottle(ld.opt.NumGoroutines) for _, r := range readers { thr.Start() diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 62e09797cc0..a045c267280 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -70,6 +70,8 @@ func less(lhs, rhs *intern.MapEntry) bool { } func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { + defer m.shards[shardIdx].mu.Unlock() // Locked by caller. + buf := entriesBuf var entries []*intern.MapEntry for len(buf) > 0 { @@ -105,7 +107,6 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { ) x.Check(os.MkdirAll(filepath.Dir(filename), 0755)) x.Check(x.WriteFileSync(filename, entriesBuf, 0644)) - m.shards[shardIdx].mu.Unlock() // Locked by caller. } func (m *mapper) run() { @@ -121,7 +122,7 @@ func (m *mapper) run() { } rdf = strings.TrimSpace(rdf) - x.Check(m.parseRDF(rdf)) + x.Check(m.processRDF(rdf)) atomic.AddInt64(&m.prog.rdfCount, 1) for i := range m.shards { sh := &m.shards[i] @@ -162,7 +163,7 @@ func (m *mapper) addMapEntry(key []byte, p *intern.Posting, shard int) { x.Check(err) } -func (m *mapper) parseRDF(rdfLine string) error { +func (m *mapper) processRDF(rdfLine string) error { nq, err := parseNQuad(rdfLine) if err != nil { if err == rdf.ErrEmpty { @@ -199,14 +200,14 @@ func (m *mapper) processNQuad(nq gql.NQuad) { key = x.ReverseKey(nq.Predicate, oid) m.addMapEntry(key, rev, shard) } + m.addIndexMapEntries(nq, de) if m.opt.ExpandEdges { + shard := m.state.shards.shardFor("_predicate_") key = x.DataKey("_predicate_", sid) pp := m.createPredicatePosting(nq.Predicate) m.addMapEntry(key, pp, shard) } - - m.addIndexMapEntries(nq, de) } func (m *mapper) lookupUid(xid string) uint64 { @@ -287,9 +288,7 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *intern.DirectedEdge) { } sch := m.schema.getSchema(nq.GetPredicate()) - for _, tokerName := range sch.GetTokenizer() { - // Find tokeniser. toker, ok := tok.GetTokenizer(tokerName) if !ok { diff --git a/dgraph/cmd/bulk/merge_shards.go b/dgraph/cmd/bulk/merge_shards.go index fc9670f383a..7129538db8b 100644 --- a/dgraph/cmd/bulk/merge_shards.go +++ b/dgraph/cmd/bulk/merge_shards.go @@ -31,8 +31,10 @@ func mergeMapShardsIntoReduceShards(opt options) { // until there are no more map shards left. Should be a good approximation. for _, shard := range mapShards { sortBySize(reduceShards) - x.Check(os.Rename(shard, filepath.Join( - reduceShards[len(reduceShards)-1], filepath.Base(shard)))) + reduceShard := filepath.Join( + reduceShards[len(reduceShards)-1], filepath.Base(shard)) + x.Printf("Shard %s -> Reduce %s\n", shard, reduceShard) + x.Check(os.Rename(shard, reduceShard)) } } diff --git a/dgraph/cmd/bulk/schema.go b/dgraph/cmd/bulk/schema.go index 7af839f9da4..58a8c5db371 100644 --- a/dgraph/cmd/bulk/schema.go +++ b/dgraph/cmd/bulk/schema.go @@ -10,6 +10,7 @@ package bulk import ( "fmt" "log" + "math" "sync" "github.com/dgraph-io/badger" @@ -82,11 +83,43 @@ func (s *schemaStore) validateType(de *intern.DirectedEdge, objectIsUID bool) { } } +func (s *schemaStore) getPredicates(db *badger.ManagedDB) []string { + txn := db.NewTransactionAt(math.MaxUint64, false) + defer txn.Discard() + + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + itr := txn.NewIterator(opts) + defer itr.Close() + + m := make(map[string]struct{}) + for itr.Rewind(); itr.Valid(); { + item := itr.Item() + pk := x.Parse(item.Key()) + m[pk.Attr] = struct{}{} + itr.Seek(pk.SkipPredicate()) + continue + } + + var preds []string + for pred := range m { + preds = append(preds, pred) + } + return preds +} + func (s *schemaStore) write(db *badger.ManagedDB) { // Write schema always at timestamp 1, s.state.writeTs may not be equal to 1 // if bulk loader was restarted or other similar scenarios. - txn := db.NewTransactionAt(1, true) - for pred, sch := range s.m { + preds := s.getPredicates(db) + + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + for _, pred := range preds { + sch, ok := s.m[pred] + if !ok { + continue + } k := x.SchemaKey(pred) v, err := sch.Marshal() x.Check(err) diff --git a/dgraph/cmd/bulk/shuffle.go b/dgraph/cmd/bulk/shuffle.go index 0e13ae4a977..f248e1fa29b 100644 --- a/dgraph/cmd/bulk/shuffle.go +++ b/dgraph/cmd/bulk/shuffle.go @@ -36,8 +36,8 @@ func (s *shuffler) run() { thr := x.NewThrottle(s.opt.NumShufflers) for i := 0; i < s.opt.ReduceShards; i++ { thr.Start() - go func(i int, db *badger.ManagedDB) { - mapFiles := filenamesInTree(shardDirs[i]) + go func(shardId int, db *badger.ManagedDB) { + mapFiles := filenamesInTree(shardDirs[shardId]) shuffleInputChs := make([]chan *intern.MapEntry, len(mapFiles)) for i, mapFile := range mapFiles { shuffleInputChs[i] = make(chan *intern.MapEntry, 1000) @@ -98,7 +98,6 @@ func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) { } func (s *shuffler) shufflePostings(mapEntryChs []chan *intern.MapEntry, ci *countIndexer) { - var ph postingHeap for _, ch := range mapEntryChs { heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go new file mode 100644 index 00000000000..905569041e7 --- /dev/null +++ b/dgraph/cmd/debug/run.go @@ -0,0 +1,123 @@ +/* + * Copyright 2017-2018 Dgraph Labs, Inc. + * + * This file is available under the Apache License, Version 2.0, + * with the Commons Clause restriction. + */ + +package debug + +import ( + "fmt" + "log" + "math" + "sort" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/dgraph/x" + "github.com/spf13/cobra" +) + +var Debug x.SubCommand + +func init() { + Debug.Cmd = &cobra.Command{ + Use: "debug", + Short: "Debug Dgraph instance", + Run: func(cmd *cobra.Command, args []string) { + run() + }, + } + + flag := Debug.Cmd.Flags() + flag.StringP("postings", "p", "", "Directory where posting lists are stored.") + flag.BoolP("predicates", "s", false, "List all the predicates.") + flag.BoolP("readonly", "o", true, "Open in read only mode.") +} + +type Stats struct { + Data int + Index int + Schema int + Reverse int + Count int + Total int +} + +func run() { + opts := badger.DefaultOptions + opts.Dir = Debug.Conf.GetString("postings") + opts.ValueDir = Debug.Conf.GetString("postings") + opts.TableLoadingMode = options.MemoryMap + opts.ReadOnly = Debug.Conf.GetBool("readonly") + + x.AssertTruef(len(opts.Dir) > 0, "No posting dir specified.") + fmt.Printf("Opening DB: %s\n", opts.Dir) + db, err := badger.OpenManaged(opts) + x.Check(err) + defer db.Close() + + if Debug.Conf.GetBool("predicates") { + txn := db.NewTransactionAt(math.MaxUint64, false) + defer txn.Discard() + + iopts := badger.DefaultIteratorOptions + iopts.PrefetchValues = false + itr := txn.NewIterator(iopts) + defer itr.Close() + + var loop int + m := make(map[string]*Stats) + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + pk := x.Parse(item.Key()) + stats, ok := m[pk.Attr] + if !ok { + stats = new(Stats) + m[pk.Attr] = stats + } + stats.Total += 1 + // Don't use a switch case here. Because multiple of these can be true. In particular, + // IsSchema can be true alongside IsData. + if pk.IsData() { + stats.Data += 1 + } + if pk.IsIndex() { + stats.Index += 1 + } + if pk.IsCount() { + stats.Count += 1 + } + if pk.IsSchema() { + stats.Schema += 1 + } + if pk.IsReverse() { + stats.Reverse += 1 + } + loop++ + } + + type C struct { + pred string + stats *Stats + } + + var counts []C + for pred, stats := range m { + counts = append(counts, C{pred, stats}) + } + sort.Slice(counts, func(i, j int) bool { + return counts[i].stats.Total > counts[j].stats.Total + }) + for _, c := range counts { + st := c.stats + fmt.Printf("Total: %-8d. Predicate: %-20s\n", st.Total, c.pred) + fmt.Printf(" Data: %d Index: %d Reverse: %d Schema: %d Count: %d Predicate: %s\n\n", + st.Data, st.Index, st.Reverse, st.Schema, st.Count, c.pred) + } + fmt.Printf("Found %d keys\n", loop) + return + } + log.Fatalln("Please provide a valid option for diagnosis.") +} diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index c052ad2aaa2..0b5f06728f8 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -12,6 +12,7 @@ import ( "os" "github.com/dgraph-io/dgraph/dgraph/cmd/bulk" + "github.com/dgraph-io/dgraph/dgraph/cmd/debug" "github.com/dgraph-io/dgraph/dgraph/cmd/live" "github.com/dgraph-io/dgraph/dgraph/cmd/server" "github.com/dgraph-io/dgraph/dgraph/cmd/version" @@ -58,7 +59,7 @@ func init() { rootConf.BindPFlags(RootCmd.PersistentFlags()) var subcommands = []*x.SubCommand{ - &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, + &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug, } for _, sc := range subcommands { RootCmd.AddCommand(sc.Cmd)