Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live & bulk loader changes #2961

Merged
merged 62 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
6145526
Accept directory as argument to --rdfs and look for .rdf and .json fi…
Jan 12, 2019
6e395be
Merge branch 'master' into javier/issue2889_live_load_json
Jan 12, 2019
98e0ea1
Add x.FileReader() function to return a reader that decompresses if n…
Jan 14, 2019
3ca188a
Forward file to the RDF or JSON processor as appropriate.
Jan 14, 2019
de3da8f
Working version?
Jan 15, 2019
c4291db
Add live test stub.
Jan 17, 2019
5866bd7
Ensure pending nquads are submitted after EOF.
Jan 17, 2019
86c421f
Change references to RDFs to N-Quads to be more correct.
Jan 17, 2019
9cb6dca
Merge master.
Jan 17, 2019
bdfcd85
Export TMPDIR in test.sh so that hopefully tests will use it. Change …
Jan 17, 2019
1e9c530
Change test data schema to reflect changes merged from master.
Jan 17, 2019
c495a01
Live JSON load test.
Jan 17, 2019
f499943
Auto-detect JSON in load files with no .rdf or .json extension in the…
Jan 18, 2019
7b5ad3b
Fix bug reading streamed JSON. Add test of loading streamed JSON.
Jan 18, 2019
405c60a
Live load testing improvements.
Jan 18, 2019
37c3437
First attempt at autogenerating uid from key fields.
Jan 19, 2019
afe44d4
Replace RDFs in message with N-Quads.
Jan 19, 2019
9da9d8e
Add debugging code.
Jan 19, 2019
fce4ff6
Fix bug assigning blank ids. Move more testing code to testing package.
Jan 21, 2019
fbad4cf
Add test of live loading multiple JSON files.
Jan 21, 2019
8c073d5
Add test of live loading JSON without UID field. Testing improvements.
Jan 21, 2019
f828d57
Add tests of live JSON load with auto-uid.
Jan 22, 2019
44b1a73
Merge branch 'master' into javier/issue2889_live_load_json
Jan 22, 2019
cc59eed
Small restartCluster fix.
Jan 22, 2019
f6b25ca
Merge master.
Jan 22, 2019
06fd748
Rename fields to be more correct or specific.
Jan 22, 2019
a23a169
Improved comments.
Jan 22, 2019
4495702
Remove test.sh change that should be part of another branch.
Jan 22, 2019
2a94576
Fix bugs catching errors.
Jan 23, 2019
c9833fd
Make auto-added blank uid fields per file.
Jan 23, 2019
cee8c8d
Don't bother hashing the concatenated key values to generate a blank …
Jan 24, 2019
3b7b648
Refactor processRdfFile
Jan 25, 2019
ad641a2
Minor changes.
Jan 28, 2019
5b4c423
Add JsonToNquads to replace NquadsFromJson later.
Jan 28, 2019
4747690
Rename some functions. Don't dump stack trace for input error.
Jan 28, 2019
6f01e18
Move chunk test from bulk package to x package.
Jan 28, 2019
0cb8030
Change processRdf to use a chunker as well.
Jan 28, 2019
ce62a5a
Move chunker from x package to a new loadfile package. Remove bulk ch…
Jan 29, 2019
7286176
Merge master.
Jan 30, 2019
1a409cf
Refactor more bulk/live code.
Jan 31, 2019
0454e92
Minor testing changes.
Jan 31, 2019
77d029b
Merge master.
Jan 31, 2019
50f5744
Merge master.
Jan 31, 2019
500fa12
Add a small visual indicator of when a long-running test starts.
Jan 31, 2019
05ff572
Fix rdfChunker.Parse to parse all RDF lines in chunk instead of only …
Jan 31, 2019
487dcc5
Add check for EOF in RDF chunk parser.
Jan 31, 2019
17125d9
Add --key option to bulk loader for parity with live loader.
Jan 31, 2019
8e7b581
Fix batching, which was broken by live/bulk refactoring.
Feb 1, 2019
1c2c9ef
Parse key fields earlier in the process.
Feb 1, 2019
116a794
Cleanup suggestions from PR.
Feb 1, 2019
8f16e04
PR review fixes.
Feb 5, 2019
7685150
Fix warning.
Feb 5, 2019
dd237b1
Remove --key support since it requires more thought.
Feb 5, 2019
db7f34f
Remove two-line functions processJsonFile and processRdfFile.
Feb 6, 2019
915eef8
Inline nextNquads() and finalNquads() into processLoadFile().
Feb 6, 2019
d09e925
Rename package loadfile to chunker.
Feb 6, 2019
94f355b
Move package rdf to chunker/rdf.
Feb 6, 2019
9f39ac9
WIP...
Feb 6, 2019
6872833
Move json and rdf nquad parsing under chunker.
Feb 6, 2019
ec990e2
Move FileReader() and IsJSONData() from x package to chunker.
Feb 6, 2019
ac41b58
Rename live --rdfs option to --files since it can load more than *.rd…
Feb 6, 2019
6143636
Fix a bug where batch slice was being modified after sending to a cha…
manishrjain Feb 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/scripts/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function restartCluster {
echo "Rebuilding dgraph ..."
make install
docker ps -a --filter label="cluster=test" --format "{{.Names}}" | xargs -r docker rm -f
docker-compose -p dgraph -f $compose_file up --force-recreate --remove-orphans --detach
docker-compose -p dgraph -f $compose_file up --force-recreate --remove-orphans --detach || exit 1
popd >/dev/null

$basedir/contrib/wait-for-it.sh -t 60 localhost:6080 || exit 1
Expand Down
65 changes: 20 additions & 45 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package bulk

import (
"bufio"
"bytes"
"compress/gzip"
"context"
Expand All @@ -27,16 +26,17 @@ import (
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

"github.com/dgraph-io/badger"
bo "github.com/dgraph-io/badger/options"
"github.com/dgraph-io/dgraph/loadfile"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"

"google.golang.org/grpc"
)

Expand All @@ -58,11 +58,13 @@ type options struct {
HttpAddr string
IgnoreErrors bool
CustomTokenizers string
KeyFields string

MapShards int
ReduceShards int

shardOutputDirs []string
parsedKeyFields []string
}

type state struct {
Expand Down Expand Up @@ -144,25 +146,6 @@ func readSchema(filename string) []*pb.SchemaUpdate {
return initialSchema
}

func findDataFiles(dir string, ext string) []string {
var files []string
x.Check(filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if strings.HasSuffix(path, ext) || strings.HasSuffix(path, ext+".gz") {
files = append(files, path)
}
return nil
}))
return files
}

type uidRangeResponse struct {
uids *pb.AssignedIds
err error
}

func (ld *loader) mapStage() {
ld.prog.setPhase(mapPhase)

Expand All @@ -181,29 +164,29 @@ func (ld *loader) mapStage() {
LRUSize: 1 << 19,
})

var files []string
var ext string
var dir, ext string
var loaderType int
if ld.opt.RDFDir != "" {
loaderType = rdfInput
loaderType = loadfile.RdfInput
dir = ld.opt.RDFDir
ext = ".rdf"
files = findDataFiles(ld.opt.RDFDir, ext)
} else {
loaderType = jsonInput
loaderType = loadfile.JsonInput
dir = ld.opt.JSONDir
ext = ".json"
files = findDataFiles(ld.opt.JSONDir, ext)
}

}
files := x.FindDataFiles(dir, []string{ext, ext + ".gz"})
if len(files) == 0 {
fmt.Printf("No *%s files found.\n", ext)
fmt.Printf("No *%s files found under %s.\n", ext, dir)
os.Exit(1)
}

var mapperWg sync.WaitGroup
mapperWg.Add(len(ld.mappers))
for _, m := range ld.mappers {
go func(m *mapper) {
m.run(loaderType)
m.run(loaderType, &ld.opt.parsedKeyFields)
mapperWg.Done()
}(m)
}
Expand All @@ -213,25 +196,17 @@ func (ld *loader) mapStage() {
for i, file := range files {
thr.Start()
fmt.Printf("Processing file (%d out of %d): %s\n", i+1, len(files), file)
chunker := newChunker(loaderType)
chunker := loadfile.NewChunker(loaderType)

go func(file string) {
defer thr.Done()

f, err := os.Open(file)
x.Check(err)
defer f.Close()
r, cleanup_fn := x.FileReader(file)
defer cleanup_fn()

var r *bufio.Reader
if !strings.HasSuffix(file, ".gz") {
r = bufio.NewReaderSize(f, 1<<20)
} else {
gzr, err := gzip.NewReader(f)
x.Checkf(err, "Could not create gzip reader for file %q.", file)
r = bufio.NewReaderSize(gzr, 1<<20)
}
x.Check(chunker.begin(r))
x.Check(chunker.Begin(r))
for {
chunkBuf, err := chunker.chunk(r)
chunkBuf, err := chunker.Chunk(r)
if chunkBuf != nil && chunkBuf.Len() > 0 {
ld.readerChunkCh <- chunkBuf
}
Expand All @@ -241,7 +216,7 @@ func (ld *loader) mapStage() {
x.Check(err)
}
}
x.Check(chunker.end(r))
x.Check(chunker.End(r))
}(file)
}
thr.Wait()
Expand Down
14 changes: 10 additions & 4 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/loadfile"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok"
Expand Down Expand Up @@ -116,12 +117,12 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
}

func (m *mapper) run(inputFormat int) {
chunker := newChunker(inputFormat)
func (m *mapper) run(inputFormat int, keyFields *[]string) {
chunker := loadfile.NewChunker(inputFormat)
for chunkBuf := range m.readerChunkCh {
done := false
for !done {
nqs, err := chunker.parse(chunkBuf)
nqs, err := chunker.Parse(chunkBuf, keyFields)
if err == io.EOF {
done = true
} else if err != nil {
Expand All @@ -131,7 +132,12 @@ func (m *mapper) run(inputFormat int) {
}
}

for _, nq := range nqs {
gqlNqs := make([]gql.NQuad, len(nqs))
for i, nq := range nqs {
gqlNqs[i] = gql.NQuad{NQuad: nq}
}

for _, nq := range gqlNqs {
if err := facets.SortAndValidate(nq.Facets); err != nil {
atomic.AddInt64(&m.prog.errCount, 1)
if !m.opt.IgnoreErrors {
Expand Down
12 changes: 9 additions & 3 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func init() {

flag := Bulk.Cmd.Flags()
flag.StringP("rdfs", "r", "",
"Directory containing *.rdf or *.rdf.gz files to load.")
"Location of RDF data to load.")
// would be nice to use -j to match -r, but already used by --num_go_routines
flag.String("jsons", "",
"Directory containing *.json or *.json.gz files to load.")
"Location of JSON data to load.")
flag.StringP("schema_file", "s", "",
"Location of schema file to load.")
flag.String("out", "out",
Expand Down Expand Up @@ -91,6 +91,7 @@ func init() {
"more parallelism, but increases memory usage.")
flag.String("custom_tokenizers", "",
"Comma separated list of tokenizer plugins")
flag.StringP("key", "k", "", "Comma-separated list of JSON fields to identify a uid")
}

func run() {
Expand All @@ -114,14 +115,15 @@ func run() {
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
KeyFields: Bulk.Conf.GetString("key"),
}

x.PrintVersion()
if opt.Version {
os.Exit(0)
}
if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "schema file must be specified.\n")
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
}
if opt.RDFDir == "" && opt.JSONDir == "" {
Expand Down Expand Up @@ -178,6 +180,10 @@ func run() {
defer os.RemoveAll(opt.TmpDir)
}

for _, f := range strings.Split(opt.KeyFields, ",") {
opt.parsedKeyFields = append(opt.parsedKeyFields, strings.TrimSpace(f))
}

loader := newLoader(opt)
if !opt.SkipMapPhase {
loader.mapStage()
Expand Down
22 changes: 11 additions & 11 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ type loader struct {
retryRequestsWg sync.WaitGroup

// Miscellaneous information to print counters.
// Num of RDF's sent
rdfs uint64
// Num of N-Quads sent
nquads uint64
// Num of txns sent
txns uint64
// Num of aborts
aborts uint64
// To get time elapsel.
// To get time elapsed
start time.Time

reqs chan api.Mutation
Expand Down Expand Up @@ -109,8 +109,8 @@ func (p *uidProvider) ReserveUidRange() (uint64, uint64, error) {
// Counter keeps a track of various parameters about a batch mutation. Running totals are printed
// if BatchMutationOptions PrintCounters is set to true.
type Counter struct {
// Number of RDF's processed by server.
Rdfs uint64
// Number of N-Quads processed by server.
Nquads uint64
// Number of mutations processed by the server.
TxnsDone uint64
// Number of Aborts
Expand Down Expand Up @@ -148,7 +148,7 @@ func (l *loader) infinitelyRetry(req api.Mutation) {
req.CommitNow = true
_, err := txn.Mutate(l.opts.Ctx, &req)
if err == nil {
atomic.AddUint64(&l.rdfs, uint64(len(req.Set)))
atomic.AddUint64(&l.nquads, uint64(len(req.Set)))
atomic.AddUint64(&l.txns, 1)
return
}
Expand All @@ -167,7 +167,7 @@ func (l *loader) request(req api.Mutation) {
_, err := txn.Mutate(l.opts.Ctx, &req)

if err == nil {
atomic.AddUint64(&l.rdfs, uint64(len(req.Set)))
atomic.AddUint64(&l.nquads, uint64(len(req.Set)))
atomic.AddUint64(&l.txns, 1)
return
}
Expand All @@ -193,17 +193,17 @@ func (l *loader) printCounters() {

for range l.ticker.C {
counter := l.Counter()
rate := float64(counter.Rdfs) / counter.Elapsed.Seconds()
rate := float64(counter.Nquads) / counter.Elapsed.Seconds()
elapsed := time.Since(start).Round(time.Second)
fmt.Printf("[%6s] Txns: %d RDFs: %d RDFs/sec: %5.0f Aborts: %d\n",
elapsed, counter.TxnsDone, counter.Rdfs, rate, counter.Aborts)
fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/sec: %5.0f Aborts: %d\n",
elapsed, counter.TxnsDone, counter.Nquads, rate, counter.Aborts)
}
}

// Counter returns the current state of the BatchMutation.
func (l *loader) Counter() Counter {
return Counter{
Rdfs: atomic.LoadUint64(&l.rdfs),
Nquads: atomic.LoadUint64(&l.nquads),
TxnsDone: atomic.LoadUint64(&l.txns),
Elapsed: time.Since(l.start),
Aborts: atomic.LoadUint64(&l.aborts),
Expand Down
Loading