Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Loader: Fix memory usage by JSON parser #3794

Merged
merged 21 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ded891b
Create a new NQuads struct and move Parse under it.
manishrjain Aug 10, 2019
9ac7d50
Move all nquads to the new NQuads struct.
manishrjain Aug 10, 2019
a6047b1
Make parse tests work
manishrjain Aug 10, 2019
6832162
Code runs and all
manishrjain Aug 10, 2019
f4cf4be
Run GC every 5s
manishrjain Aug 10, 2019
39a00b3
GC every 10s
manishrjain Aug 10, 2019
b81b207
Set worker goroutines aka mappers to 1/4th of the number of cores.
manishrjain Aug 10, 2019
fa831be
Only run GC if it hadn't been run. Also mention the RAM usage being p…
manishrjain Aug 10, 2019
70eb295
Files moved to one directory to allow easier sharing of the code.
manishrjain Aug 11, 2019
b7a299b
Add batchSize option to Chunker and NQuadBuffer to simplify loader an…
manishrjain Aug 11, 2019
17b557b
Bring the edgraph/server.go code back. Move multiple RDF parsing to c…
manishrjain Aug 12, 2019
3f89c68
Hook up NQuadBuffer to RDFChunker.NQuads
manishrjain Aug 12, 2019
5e12649
Address golint issues
manishrjain Aug 12, 2019
948bfde
Fix the live loader test failure.
manishrjain Aug 12, 2019
3e25781
Don't think we need a for loop around chunker.Parse
manishrjain Aug 12, 2019
541c9be
Move FacetDelimiter to x package to avoid a cyclic import loop.
manishrjain Aug 12, 2019
297651f
Fix gql package test
manishrjain Aug 12, 2019
8f7f8cf
Fix a test failure by handling io.EOF
manishrjain Aug 12, 2019
1d1f529
Don't return io.EOF unnecessarily
manishrjain Aug 12, 2019
de862d1
Address comments by PR folks. Also, no need to handle io.EOF because …
manishrjain Aug 12, 2019
5c420fb
Address PR folks comments
manishrjain Aug 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
61 changes: 36 additions & 25 deletions chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ import (
"strings"
"unicode"

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgo/x"
"github.com/dgraph-io/dgraph/chunker/json"
"github.com/dgraph-io/dgraph/chunker/rdf"
"github.com/dgraph-io/dgraph/lex"

"github.com/pkg/errors"
Expand All @@ -42,14 +39,26 @@ type Chunker interface {
Begin(r *bufio.Reader) error
Chunk(r *bufio.Reader) (*bytes.Buffer, error)
End(r *bufio.Reader) error
Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error)
Parse(chunkBuf *bytes.Buffer) error
NQuads() *NQuadBuffer
}

type rdfChunker struct {
lexer *lex.Lexer
nqs *NQuadBuffer
}

type jsonChunker struct{}
func (rc *rdfChunker) NQuads() *NQuadBuffer {
return rc.nqs
}

type jsonChunker struct {
nqs *NQuadBuffer
}

func (jc *jsonChunker) NQuads() *NQuadBuffer {
return jc.nqs
}

// InputFormat represents the multiple formats supported by Chunker.
type InputFormat byte
Expand All @@ -64,27 +73,32 @@ const (
)

// NewChunker returns a new chunker for the specified format.
func NewChunker(inputFormat InputFormat) Chunker {
func NewChunker(inputFormat InputFormat, batchSize int) Chunker {
switch inputFormat {
case RdfFormat:
return &rdfChunker{lexer: &lex.Lexer{}}
return &rdfChunker{
nqs: NewNQuadBuffer(batchSize),
lexer: &lex.Lexer{},
}
case JsonFormat:
return &jsonChunker{}
return &jsonChunker{
nqs: NewNQuadBuffer(batchSize),
}
default:
panic("unknown input format")
}
}

// RDF files don't require any special processing at the beginning of the file.
func (c *rdfChunker) Begin(r *bufio.Reader) error {
func (rdfChunker) Begin(r *bufio.Reader) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is often considered best practice to have consistent typing with your receivers and if one is a pointer, make them all pointers. Since NQuads() and Parse are taking a pointer receiver you may consider keeping these others (Begin, Chunk, End) as pointer as well to avoid any surprises. (https://golang.org/doc/faq#methods_on_values_or_pointers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FAQ seems to only talk about values vs pointers. In this case, the rdfChunker object isn't even being used. Given that, it is clearer to not even declare the object variable. Clearly indicates to the user that the object isn't being used.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry yeah I meant the type changing from *rdfChunker to rdfChunker, not the removing of the variable naming. So for example just updating to:

func (*rdfChunker) Begin(r *bufio.Reader) error {

Its totally fine to ignore in this case if you prefer though since you are always using the rdfChunker as a pointer it seems so it will have both sets of methods. It is just that there is a different set of methods available on an rdChunker in value vs pointer form currently.

return nil
}

// Chunk reads the input line by line until one of the following 3 conditions happens
// 1) the EOF is reached
// 2) 1e5 lines have been read
// 3) some unexpected error happened
func (c *rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
func (rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(1 << 20)
for lineCount := 0; lineCount < 1e5; lineCount++ {
Expand Down Expand Up @@ -117,32 +131,30 @@ func (c *rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
}

// Parse is not thread-safe. Only call it serially, because it reuses lexer object.
func (c *rdfChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
func (rc *rdfChunker) Parse(chunkBuf *bytes.Buffer) error {
if chunkBuf.Len() == 0 {
return nil, io.EOF
return io.EOF
}

nqs := make([]*api.NQuad, 0)
for chunkBuf.Len() > 0 {
str, err := chunkBuf.ReadString('\n')
if err != nil && err != io.EOF {
x.Check(err)
}

nq, err := rdf.Parse(str, c.lexer)
if err == rdf.ErrEmpty {
nq, err := ParseRDF(str, rc.lexer)
if err == ErrEmpty {
continue // blank line or comment
} else if err != nil {
return nil, errors.Wrapf(err, "while parsing line %q", str)
return errors.Wrapf(err, "while parsing line %q", str)
}
nqs = append(nqs, &nq)
rc.nqs.Push(&nq)
}

return nqs, nil
return nil
}

// RDF files don't require any special processing at the end of the file.
func (c *rdfChunker) End(r *bufio.Reader) error {
func (rdfChunker) End(r *bufio.Reader) error {
return nil
}

Expand Down Expand Up @@ -231,18 +243,17 @@ func (jsonChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
return out, nil
}

func (jsonChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
func (jc *jsonChunker) Parse(chunkBuf *bytes.Buffer) error {
if chunkBuf.Len() == 0 {
return nil, io.EOF
return io.EOF
}

nqs, err := json.Parse(chunkBuf.Bytes(), json.SetNquads)
err := jc.nqs.ParseJSON(chunkBuf.Bytes(), SetNquads)
if err != nil && err != io.EOF {
x.Check(err)
}
chunkBuf.Reset()

return nqs, err
return err
}

func (jsonChunker) End(r *bufio.Reader) error {
Expand Down
8 changes: 4 additions & 4 deletions chunker/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestJSONLoadStart(t *testing.T) {
}

for _, test := range tests {
chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
require.Error(t, chunker.Begin(bufioReader(test.json)), test.desc)
}
}
Expand All @@ -64,7 +64,7 @@ func TestJSONLoadReadNext(t *testing.T) {
{"[{}", "malformed array"},
}
for _, test := range tests {
chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
reader := bufioReader(test.json)
require.NoError(t, chunker.Begin(reader), test.desc)

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestJSONLoadSuccessFirst(t *testing.T) {
},
}
for _, test := range tests {
chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
reader := bufioReader(test.json)
require.NoError(t, chunker.Begin(reader), test.desc)

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestJSONLoadSuccessAll(t *testing.T) {
}`,
}

chunker := NewChunker(JsonFormat)
chunker := NewChunker(JsonFormat, 1000)
reader := bufioReader(testDoc)

var json *bytes.Buffer
Expand Down
Loading