Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bulk load of JSON data. #2799

Merged
merged 37 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
463fa65
Based on bulk schema issue discussion.
manishrjain Nov 15, 2018
0533fea
Merge remote-tracking branch 'origin/master' into codexnull/bulk-schema
Nov 16, 2018
d0f2217
Don't skip predicates with value type of default when loading the sch…
Nov 16, 2018
4b7094d
Allow running test.sh from another directory.
Nov 16, 2018
568face
Remove debugging comments.
Nov 16, 2018
f364e15
Remove debug comment.
Nov 16, 2018
b4bd7c4
Keep all predicates from bulk import schema, not just the ones used.
Nov 18, 2018
1b36ce5
Merge branch 'master' into codexnull/bulk-schema
Nov 19, 2018
358921d
Merge branch 'master' into codexnull/bulk-schema
Nov 19, 2018
424b71b
Merge in changes from master.
Nov 19, 2018
043460c
Make set of predicates the union of predicates in the schema and rdf.
Nov 20, 2018
7173a1b
Merge branch 'master' into codexnull/bulk-schema
Nov 26, 2018
995ca9a
Allow or disallow schema and rdf files depending on whether map phase…
Nov 27, 2018
d4225bc
Add test for schema after export/bulk load.
Nov 27, 2018
0382370
Add more schema test cases.
Nov 27, 2018
2915889
Revert disallowing schema and rdf files with --skip_map_phase; loader…
Nov 27, 2018
002a94b
In-progress work for bulk loading JSON data.
Nov 29, 2018
d7334cb
Merge branch 'master' into codexnull/bulk-json
Nov 29, 2018
41a259e
Work in progress...
Nov 29, 2018
6251c80
Only output open file limit info if there is a problem.
Nov 29, 2018
9314707
in progress...
Nov 30, 2018
e8f3f3a
First complete(?) feature implementation.
Nov 30, 2018
0957b92
Merge branch 'master' into codexnull/bulk-json
Nov 30, 2018
e2dd97d
Remove debug options from BUILD_FLAGS.
Nov 30, 2018
17236f1
Merge branch 'master' into codexnull/bulk-json
codexnull Nov 30, 2018
2d715bc
Fix some warnings about igonring error return value.
Nov 30, 2018
22f1219
Add, improve comments.
Nov 30, 2018
56728ce
Refactor RDF-to-NQuad and JSON-to-NQuad code to allow testing each in…
Dec 6, 2018
8213ed1
Add some bulk loading tests... will add more.
Dec 8, 2018
1fc85b9
Add more test cases.
Dec 10, 2018
32f646e
Refactor bulk loader chunking to use interfaces. Add more tests.
Dec 11, 2018
bbdd686
Refactor bulk loader mapping to use interfaces.
Dec 11, 2018
b975cf8
Run goimports on bulk/{mapper,run}.go.
Dec 11, 2018
5dbc835
Simplify chunker code.
manishrjain Dec 14, 2018
09984ad
Consolidate all chunker code into one interface, one file and corresp…
manishrjain Dec 14, 2018
a96df13
Manish final review
manishrjain Dec 14, 2018
7560ab5
Merge branch 'master' into codexnull/bulk-json
Dec 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 262 additions & 0 deletions dgraph/cmd/bulk/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bulk

import (
"bufio"
"bytes"
"fmt"
"io"
"strings"
"unicode"

"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
)

type chunker interface {
begin(r *bufio.Reader) error
chunk(r *bufio.Reader) (*bytes.Buffer, error)
end(r *bufio.Reader) error
parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error)
}

type rdfChunker struct{}
type jsonChunker struct{}

const (
rdfInput int = iota
jsonInput
)

func newChunker(inputFormat int) chunker {
switch inputFormat {
case rdfInput:
return &rdfChunker{}
case jsonInput:
return &jsonChunker{}
default:
panic("unknown loader type")
}
}

func (_ rdfChunker) begin(r *bufio.Reader) error {
return nil
}

func (_ rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(1 << 20)
for lineCount := 0; lineCount < 1e5; lineCount++ {
slc, err := r.ReadSlice('\n')
if err == io.EOF {
batch.Write(slc)
return batch, err
}
if err == bufio.ErrBufferFull {
// This should only happen infrequently.
batch.Write(slc)
var str string
str, err = r.ReadString('\n')
if err == io.EOF {
batch.WriteString(str)
return batch, err
}
if err != nil {
return nil, err
}
batch.WriteString(str)
continue
}
if err != nil {
return nil, err
}
batch.Write(slc)
}
return batch, nil
}

func (_ rdfChunker) end(r *bufio.Reader) error {
return nil
}

func (_ rdfChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) {
str, readErr := chunkBuf.ReadString('\n')
if readErr != nil && readErr != io.EOF {
x.Check(readErr)
}

nq, parseErr := rdf.Parse(strings.TrimSpace(str))
if parseErr == rdf.ErrEmpty {
return nil, readErr
} else if parseErr != nil {
return nil, errors.Wrapf(parseErr, "while parsing line %q", str)
}

return []gql.NQuad{gql.NQuad{NQuad: &nq}}, readErr
}

func slurpSpace(r *bufio.Reader) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
if !unicode.IsSpace(ch) {
r.UnreadRune()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of r.UnreadRune is not checked (from errcheck)

break
}
}
return nil
}

func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(ch))

if ch == '\\' {
// Pick one more rune.
if esc, _, err := r.ReadRune(); err != nil {
return err
} else {
x.Check2(out.WriteRune(esc))
continue
}
}
if ch == '"' {
return nil
}
}
}

func (_ jsonChunker) begin(r *bufio.Reader) error {
// The JSON file to load must be an array of maps (that is, '[ { ... }, { ... }, ... ]').
// This function must be called before calling readJSONChunk for the first time to advance
// the Reader past the array start token ('[') so that calls to readJSONChunk can read
// one array element at a time instead of having to read the entire array into memory.
if err := slurpSpace(r); err != nil {
return err
}

ch, _, err := r.ReadRune()
if err != nil {
return err
}
if ch != '[' {
return fmt.Errorf("json file must contain array. Found: %v", ch)
}
return nil
}

func (_ jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
out := new(bytes.Buffer)
out.Grow(1 << 20)

// For RDF, the loader just reads the input and the mapper parses it into nquads,
// so do the same for JSON. But since JSON is not line-oriented like RDF, it's a little
// more complicated to ensure a complete JSON structure is read.

if err := slurpSpace(r); err != nil {
return out, err
}
ch, _, err := r.ReadRune()
if err != nil {
return out, err
}
if ch != '{' {
return nil, fmt.Errorf("expected json map start. Found: %v", ch)
}
x.Check2(out.WriteRune(ch))

// Just find the matching closing brace. Let the JSON-to-nquad parser in the mapper worry
// about whether everything in between is valid JSON or not.

depth := 1 // We already consumed one `{`, so our depth starts at one.
for depth > 0 {
ch, _, err = r.ReadRune()
if err != nil {
return nil, errors.New("malformed json")
}
x.Check2(out.WriteRune(ch))

switch ch {
case '{':
depth++
case '}':
depth--
case '"':
if err := slurpQuoted(r, out); err != nil {
return nil, err
}
default:
// We just write the rune to out, and let the Go JSON parser do its job.
}
}

// The map should be followed by either the ',' between array elements, or the ']'
// at the end of the array.
if err := slurpSpace(r); err != nil {
return nil, err
}
ch, _, err = r.ReadRune()
if err != nil {
return nil, err
}
switch ch {
case ']':
return out, io.EOF
case ',':
// pass
default:
// Let next call to this function report the error.
x.Check(r.UnreadRune())
}
return out, nil
}

func (_ jsonChunker) end(r *bufio.Reader) error {
if slurpSpace(r) == io.EOF {
return nil
} else {
return errors.New("not all of json file consumed")
}
}

func (_ jsonChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
}

nqs, err := edgraph.NquadsFromJson(chunkBuf.Bytes())
if err != nil && err != io.EOF {
x.Check(err)
}
chunkBuf.Reset()

gqlNq := make([]gql.NQuad, len(nqs))
for i, nq := range nqs {
gqlNq[i] = gql.NQuad{NQuad: nq}
}
return gqlNq, err
}
Loading