Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bulk load of JSON data. #2799

Merged
merged 37 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
463fa65
Based on bulk schema issue discussion.
manishrjain Nov 15, 2018
0533fea
Merge remote-tracking branch 'origin/master' into codexnull/bulk-schema
Nov 16, 2018
d0f2217
Don't skip predicates with value type of default when loading the sch…
Nov 16, 2018
4b7094d
Allow running test.sh from another directory.
Nov 16, 2018
568face
Remove debugging comments.
Nov 16, 2018
f364e15
Remove debug comment.
Nov 16, 2018
b4bd7c4
Keep all predicates from bulk import schema, not just the ones used.
Nov 18, 2018
1b36ce5
Merge branch 'master' into codexnull/bulk-schema
Nov 19, 2018
358921d
Merge branch 'master' into codexnull/bulk-schema
Nov 19, 2018
424b71b
Merge in changes from master.
Nov 19, 2018
043460c
Make set of predicates the union of predicates in the schema and rdf.
Nov 20, 2018
7173a1b
Merge branch 'master' into codexnull/bulk-schema
Nov 26, 2018
995ca9a
Allow or disallow schema and rdf files depending on whether map phase…
Nov 27, 2018
d4225bc
Add test for schema after export/bulk load.
Nov 27, 2018
0382370
Add more schema test cases.
Nov 27, 2018
2915889
Revert disallowing schema and rdf files with --skip_map_phase; loader…
Nov 27, 2018
002a94b
In-progress work for bulk loading JSON data.
Nov 29, 2018
d7334cb
Merge branch 'master' into codexnull/bulk-json
Nov 29, 2018
41a259e
Work in progress...
Nov 29, 2018
6251c80
Only output open file limit info if there is a problem.
Nov 29, 2018
9314707
in progress...
Nov 30, 2018
e8f3f3a
First complete(?) feature implementation.
Nov 30, 2018
0957b92
Merge branch 'master' into codexnull/bulk-json
Nov 30, 2018
e2dd97d
Remove debug options from BUILD_FLAGS.
Nov 30, 2018
17236f1
Merge branch 'master' into codexnull/bulk-json
codexnull Nov 30, 2018
2d715bc
Fix some warnings about igonring error return value.
Nov 30, 2018
22f1219
Add, improve comments.
Nov 30, 2018
56728ce
Refactor RDF-to-NQuad and JSON-to-NQuad code to allow testing each in…
Dec 6, 2018
8213ed1
Add some bulk loading tests... will add more.
Dec 8, 2018
1fc85b9
Add more test cases.
Dec 10, 2018
32f646e
Refactor bulk loader chunking to use interfaces. Add more tests.
Dec 11, 2018
bbdd686
Refactor bulk loader mapping to use interfaces.
Dec 11, 2018
b975cf8
Run goimports on bulk/{mapper,run}.go.
Dec 11, 2018
5dbc835
Simplify chunker code.
manishrjain Dec 14, 2018
09984ad
Consolidate all chunker code into one interface, one file and corresp…
manishrjain Dec 14, 2018
a96df13
Manish final review
manishrjain Dec 14, 2018
7560ab5
Merge branch 'master' into codexnull/bulk-json
Dec 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ gql/fuzz-data/suppressions
.idea
dgraph.iml

# vim session backups
.*.swp

# Binaries for programs and plugins
*.exe
*.exe~
Expand Down
190 changes: 161 additions & 29 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"compress/gzip"
"context"
"fmt"
"github.com/pkg/errors"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed (from goimports)

"io"
"io/ioutil"
"os"
Expand All @@ -30,6 +31,7 @@ import (
"strings"
"sync"
"time"
"unicode"

"github.com/dgraph-io/badger"
bo "github.com/dgraph-io/badger/options"
Expand All @@ -42,6 +44,7 @@ import (

type options struct {
RDFDir string
JSONDir string
SchemaFile string
DgraphsDir string
TmpDir string
Expand All @@ -63,16 +66,21 @@ type options struct {
shardOutputDirs []string
}

const (
rdfLoader int = iota
jsonLoader
)

type state struct {
opt options
prog *progress
xids *xidmap.XidMap
schema *schemaStore
shards *shardMap
rdfChunkCh chan *bytes.Buffer
mapFileId uint32 // Used atomically to name the output files of the mappers.
dbs []*badger.DB
writeTs uint64 // All badger writes use this timestamp
opt options
prog *progress
xids *xidmap.XidMap
schema *schemaStore
shards *shardMap
readerChunkCh chan *bytes.Buffer
mapFileId uint32 // Used atomically to name the output files of the mappers.
dbs []*badger.DB
writeTs uint64 // All badger writes use this timestamp
}

type loader struct {
Expand All @@ -94,8 +102,8 @@ func newLoader(opt options) *loader {
prog: newProgress(),
shards: newShardMap(opt.MapShards),
// Lots of gz readers, so not much channel buffer needed.
rdfChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
writeTs: getWriteTimestamp(zero),
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
writeTs: getWriteTimestamp(zero),
}
st.schema = newSchemaStore(readSchema(opt.SchemaFile), opt, st)
ld := &loader{
Expand Down Expand Up @@ -142,9 +150,9 @@ func readSchema(filename string) []*pb.SchemaUpdate {
return initialSchema
}

func readChunk(r *bufio.Reader) (*bytes.Buffer, error) {
func readRDFChunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(10 << 20)
batch.Grow(1 << 20)
for lineCount := 0; lineCount < 1e5; lineCount++ {
slc, err := r.ReadSlice('\n')
if err == io.EOF {
Expand Down Expand Up @@ -174,13 +182,114 @@ func readChunk(r *bufio.Reader) (*bytes.Buffer, error) {
return batch, nil
}

func findRDFFiles(dir string) []string {
func skipSpace(r *bufio.Reader) error {
ch, _, err := r.ReadRune()
for unicode.IsSpace(ch) {
ch, _, err = r.ReadRune()
}

if err == nil {
err = r.UnreadRune()
}

return nil
}
func readJSONPreChunk(r *bufio.Reader) error {
// The JSON file to load must be an array of maps (that is, '[ { ... }, { ... }, ... ]').
// This function must be called before calling readJSONChunk for the first time to advance
// the Reader past the array start token ('[') so that calls to readJSONChunk can read
// one array element at a time instead of having to read the entire array into memory.
if err := skipSpace(r); err != nil {
return err
}

ch, _, err := r.ReadRune()
if err != nil && err != io.EOF {
return err
} else if ch != '[' {
return errors.New("json file must contain array")
}

return nil
}

func readJSONChunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(1 << 20)

// For RDF, the loader just reads the input and the mapper parses it into nquads,
// so do the same for JSON. But since JSON is not line-oriented like RDF, it's a little
// more complicated to ensure a complete JSON structure is read.

if err := skipSpace(r); err != nil {
return nil, err
}

ch, _, err := r.ReadRune()
if err == io.EOF {
return batch, err
} else if ch != '{' {
return nil, errors.New("expected json map start")
}

// Just find the matching closing brace. Let the JSON-to-nquad parser in the mapper worry
// about whether everything in between is valid JSON or not.
depth := 0
quoted := false
done := false
var pch rune
for !done {
batch.WriteRune(ch)
pch = ch

ch, _, err = r.ReadRune()
if err != nil {
// any error at this point, even EOF, is fatal
return nil, errors.New("malformed json")
}

switch ch {
case '{':
if !quoted {
depth++
}
case '}':
if !quoted {
if depth == 0 {
batch.WriteRune(ch)
done = true
} else {
depth--
}
}
case '"':
if !quoted || (quoted && pch != '\\') {
quoted = !quoted
}
}
}

// The map should be followed by either the ',' between array elements, or the ']'
// at the end of the array.
_ = skipSpace(r)
ch, _, err = r.ReadRune()
if ch == ']' {
err = io.EOF
} else if ch != ',' {
// Let next call to this function report the error.
_ = r.UnreadRune()
}

return batch, nil
}

func findDataFiles(dir string, ext string) []string {
var files []string
x.Check(filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if strings.HasSuffix(path, ".rdf") || strings.HasSuffix(path, ".rdf.gz") {
if strings.HasSuffix(path, ext) || strings.HasSuffix(path, ext+".gz") {
files = append(files, path)
}
return nil
Expand Down Expand Up @@ -211,59 +320,82 @@ func (ld *loader) mapStage() {
LRUSize: 1 << 19,
})

var files []string
var ext string
var loaderType int
var chunker func(r *bufio.Reader) (*bytes.Buffer, error)
if ld.opt.RDFDir != "" {
loaderType = rdfLoader
ext = ".rdf"
files = findDataFiles(ld.opt.RDFDir, ext)
chunker = readRDFChunk
} else {
loaderType = jsonLoader
ext = ".json"
files = findDataFiles(ld.opt.JSONDir, ext)
chunker = readJSONChunk
}

readers := make(map[string]*bufio.Reader)
for _, rdfFile := range findRDFFiles(ld.opt.RDFDir) {
f, err := os.Open(rdfFile)
for _, file := range files {
f, err := os.Open(file)
x.Check(err)
defer f.Close()
if !strings.HasSuffix(rdfFile, ".gz") {
readers[rdfFile] = bufio.NewReaderSize(f, 1<<20)
// TODO detect compressed input instead of relying on filename
// so data can be streamed in
if !strings.HasSuffix(file, ".gz") {
readers[file] = bufio.NewReaderSize(f, 1<<20)
} else {
gzr, err := gzip.NewReader(f)
x.Checkf(err, "Could not create gzip reader for RDF file %q.", rdfFile)
readers[rdfFile] = bufio.NewReader(gzr)
x.Checkf(err, "Could not create gzip reader for file %q.", file)
readers[file] = bufio.NewReader(gzr)
}
}

if len(readers) == 0 {
fmt.Println("No rdf files found.")
fmt.Printf("No *.%s files found.\n", ext)
os.Exit(1)
}

var mapperWg sync.WaitGroup
mapperWg.Add(len(ld.mappers))
for _, m := range ld.mappers {
go func(m *mapper) {
m.run()
m.run(loaderType)
mapperWg.Done()
}(m)
}

// This is the main map loop.
thr := x.NewThrottle(ld.opt.NumGoroutines)
var fileCount int
for rdfFile, r := range readers {
for file, r := range readers {
thr.Start()
fileCount++
fmt.Printf("Processing file (%d out of %d): %s\n", fileCount, len(readers), rdfFile)
fmt.Printf("Processing file (%d out of %d): %s\n", fileCount, len(readers), file)
go func(r *bufio.Reader) {
defer thr.Done()
if loaderType == jsonLoader {
if err := readJSONPreChunk(r); err != nil {
x.Check(err)
}
}
for {
chunkBuf, err := readChunk(r)
chunkBuf, err := chunker(r)
if err == io.EOF {
if chunkBuf.Len() != 0 {
ld.rdfChunkCh <- chunkBuf
ld.readerChunkCh <- chunkBuf
}
break
}
x.Check(err)
ld.rdfChunkCh <- chunkBuf
ld.readerChunkCh <- chunkBuf
}
}(r)
}
thr.Wait()

close(ld.rdfChunkCh)
close(ld.readerChunkCh)
mapperWg.Wait()

// Allow memory to GC before the reduce phase.
Expand Down
57 changes: 42 additions & 15 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/dgraph-io/dgraph/edgraph"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed (from goimports)

"io"
"log"
"math"
Expand Down Expand Up @@ -118,28 +119,54 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
}

func (m *mapper) run() {
for chunkBuf := range m.rdfChunkCh {
func (m *mapper) run(loaderType int) {
for chunkBuf := range m.readerChunkCh {
done := false
for !done {
rdf, err := chunkBuf.ReadString('\n')
if err == io.EOF {
// Process the last RDF rather than breaking immediately.
done = true
if loaderType == rdfLoader {
str, err := chunkBuf.ReadString('\n')
if err == io.EOF {
// Process the last chunk rather than breaking immediately.
done = true
} else {
x.Check(err)
}
str = strings.TrimSpace(str)

// process RDF line
if err := m.processRDF(str); err != nil {
atomic.AddInt64(&m.prog.errCount, 1)
if !m.opt.IgnoreErrors {
x.Check(err)
}
}

atomic.AddInt64(&m.prog.rdfCount, 1)
} else {
x.Check(err)
}
rdf = strings.TrimSpace(rdf)
// process JSON chunk
str, err := chunkBuf.ReadBytes(0)
nquads, err := edgraph.NquadsFromJson(str)
if err == io.EOF {
done = true
} else if err != nil {
atomic.AddInt64(&m.prog.errCount, 1)
if !m.opt.IgnoreErrors {
x.Check(err)
}
}

// process RDF line
if err := m.processRDF(rdf); err != nil {
atomic.AddInt64(&m.prog.errCount, 1)
if !m.opt.IgnoreErrors {
x.Check(err)
for _, nq := range nquads {
if err := facets.SortAndValidate(nq.Facets); err != nil {
if !m.opt.IgnoreErrors {
x.Check(err)
}
}

m.processNQuad(gql.NQuad{NQuad: nq})
atomic.AddInt64(&m.prog.rdfCount, 1)
}
}

atomic.AddInt64(&m.prog.rdfCount, 1)
for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) >= int(m.opt.MapBufSize) {
Expand Down
Loading