-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk Loader: Fix memory usage by JSON parser #3794
Conversation
…roportional to worker threads in shout-case.
…hunker package as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ A review job has been created and sent to the PullRequest network.
Check the status or cancel PullRequest code review here.
return buf | ||
} | ||
|
||
func (buf *NQuadBuffer) Ch() <-chan []*api.NQuad { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name buf should be consistent with previous receiver name nqs for NQuadBuffer (from golint
)
return buf.nqCh | ||
} | ||
|
||
func (buf *NQuadBuffer) Push(nqs ...*api.NQuad) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name buf should be consistent with previous receiver name nqs for NQuadBuffer (from golint
)
} | ||
} | ||
|
||
func (buf *NQuadBuffer) Flush() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name buf should be consistent with previous receiver name nqs for NQuadBuffer (from golint
)
chunker/chunk.go
Outdated
@@ -117,28 +131,26 @@ func (c *rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) { | |||
} | |||
|
|||
// Parse is not thread-safe. Only call it serially, because it reuses lexer object. | |||
func (c *rdfChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) { | |||
func (c *rdfChunker) Parse(chunkBuf *bytes.Buffer) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name c should be consistent with previous receiver name rc for rdfChunker (from golint
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 14 files reviewed, 4 unresolved discussions (waiting on @gitlw, @golangcibot, @mangalaman93, @manishrjain, and @martinmr)
chunker/json_parser.go, line 245 at r1 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
receiver name buf should be consistent with previous receiver name nqs for NQuadBuffer (from
golint
)
Done.
chunker/json_parser.go, line 249 at r1 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
receiver name buf should be consistent with previous receiver name nqs for NQuadBuffer (from
golint
)
Done.
chunker/json_parser.go, line 259 at r1 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
receiver name buf should be consistent with previous receiver name nqs for NQuadBuffer (from
golint
)
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 10 of 14 files at r1, 4 of 4 files at r3.
Reviewable status: all files reviewed, 13 unresolved discussions (waiting on @gitlw, @golangcibot, @mangalaman93, and @manishrjain)
dgraph/cmd/live/run.go, line 218 at r3 (raw file):
chunkBuf, err := ck.Chunk(rd) // process parses the rdf entries from the chunk, and group them into batches (each one
comment is confusing. What is process? Should it just say "Parses the RDF entries ..."?
chunker/json_parser.go, line 180 at r3 (raw file):
} func (nqs *NQuadBuffer) checkForDeletion(mr mapResponse, m map[string]interface{}, op int) {
why is mr no longer a pointer?
chunker/json_parser.go, line 233 at r3 (raw file):
"is set to -1"
what happens if it's set to other negative number. Maybe generalize the logic and comment to have this behavior for all numbers < 0 if it's not doing that already.
chunker/json_parser.go, line 245 at r3 (raw file):
} func (buf *NQuadBuffer) Ch() <-chan []*api.NQuad {
add docstring to exported method.
chunker/json_parser.go, line 249 at r3 (raw file):
} func (buf *NQuadBuffer) Push(nqs ...*api.NQuad) {
add docstring to exported method.
chunker/json_parser.go, line 259 at r3 (raw file):
} func (buf *NQuadBuffer) Flush() {
add docstring to exported method.
chunker/json_parser.go, line 441 at r3 (raw file):
) // ParseJSON converts the given byte slice into a slice of NQuads.
Maybe update the docstring? I guess it's still correct but now there's an intermediate step (the nquad buffer).
chunker/json/parse.go, line 349 at r3 (raw file):
mr.nquads = append(mr.nquads, &nq) // Add the nquads that we got for the connecting entity. mr.nquads = append(mr.nquads, cr.nquads...)
If I am reading this correctly, the new logic is no longer appending cr.nquads to the output. Will that break anything?
chunker/json/parse.go, line 382 at r3 (raw file):
mr.nquads = append(mr.nquads, &nq) // Add the nquads that we got for the connecting entity. mr.nquads = append(mr.nquads, cr.nquads...)
Same point here regarding cr.nquads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @golangcibot, @mangalaman93, and @manishrjain)
dgraph/cmd/bulk/run.go, line 218 at r3 (raw file):
fmt.Printf("GC: %d. InUse: %s. Idle: %s\n", ms.NumGC, humanize.Bytes(ms.HeapInuse), humanize.Bytes(ms.HeapIdle-ms.HeapReleased)) if ms.NumGC > lastNum {
Maybe add a comment to explain that in this case, GC has been run by the go runtime, so we don't need to force a GC in this iteration.
chunker/json_parser.go, line 233 at r3 (raw file):
// NewNQuadBuffer would return a new buffer. It would batch up batchSize NQuads per push to channel, // accessible via Ch(). If batchSize is set to -1, it would only do one push to Ch() during Flush.
Maybe replace -1 to "0 or a negative value" to be more precise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some minor comments, otherwise
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @golangcibot, @mangalaman93, and @manishrjain)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @golangcibot, @mangalaman93, @manishrjain, and @martinmr)
chunker/json/parse.go, line 349 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
If I am reading this correctly, the new logic is no longer appending cr.nquads to the output. Will that break anything?
I think the child's nquads will be pushed to the buffer through the function call above buf.mapToNquads(...)
Compared with the current implementation, this PR would make the child's nquads show up before the this connecting nq. But it seems that won't break anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 7 of 19 files reviewed, 15 unresolved discussions (waiting on @gitlw, @golangcibot, @mangalaman93, and @martinmr)
dgraph/cmd/bulk/run.go, line 218 at r3 (raw file):
Previously, gitlw (Lucas Wang) wrote…
Maybe add a comment to explain that in this case, GC has been run by the go runtime, so we don't need to force a GC in this iteration.
Done.
dgraph/cmd/live/run.go, line 218 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
comment is confusing. What is process? Should it just say "Parses the RDF entries ..."?
Done.
chunker/json_parser.go, line 180 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
why is mr no longer a pointer?
Nothing is being written to it.
chunker/json_parser.go, line 233 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
"is set to -1"
what happens if it's set to other negative number. Maybe generalize the logic and comment to have this behavior for all numbers < 0 if it's not doing that already.
Already works like that. I changed the comment to just say "negative".
chunker/json_parser.go, line 233 at r3 (raw file):
Previously, gitlw (Lucas Wang) wrote…
Maybe replace -1 to "0 or a negative value" to be more precise.
Done.
chunker/json_parser.go, line 245 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
add docstring to exported method.
Done.
chunker/json_parser.go, line 249 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
add docstring to exported method.
Done.
chunker/json_parser.go, line 259 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
add docstring to exported method.
Done.
chunker/json_parser.go, line 441 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
Maybe update the docstring? I guess it's still correct but now there's an intermediate step (the nquad buffer).
Done.
chunker/json/parse.go, line 349 at r3 (raw file):
Previously, gitlw (Lucas Wang) wrote…
I think the child's nquads will be pushed to the buffer through the function call above buf.mapToNquads(...)
Compared with the current implementation, this PR would make the child's nquads show up before the this connecting nq. But it seems that won't break anything.
Yup. The children would show up first, but NQuads work independently. Each NQuad is self-sufficient.
chunker/json/parse.go, line 382 at r3 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
Same point here regarding cr.nquads
Done. See above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach here makes sense to me and looks sound. I have a few more minor comments along the way but otherwise this looks good from my side.
Reviewed with ❤️ by PullRequest
chunker/chunk.go
Outdated
default: | ||
panic("unknown input format") | ||
} | ||
} | ||
|
||
// RDF files don't require any special processing at the beginning of the file. | ||
func (c *rdfChunker) Begin(r *bufio.Reader) error { | ||
func (rdfChunker) Begin(r *bufio.Reader) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is often considered best practice to have consistent typing with your receivers and if one is a pointer, make them all pointers. Since NQuads()
and Parse
are taking a pointer receiver you may consider keeping these others (Begin, Chunk, End) as pointer as well to avoid any surprises. (https://golang.org/doc/faq#methods_on_values_or_pointers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FAQ seems to only talk about values vs pointers. In this case, the rdfChunker object isn't even being used. Given that, it is clearer to not even declare the object variable. Clearly indicates to the user that the object isn't being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry yeah I meant the type changing from *rdfChunker
to rdfChunker
, not the removing of the variable naming. So for example just updating to:
func (*rdfChunker) Begin(r *bufio.Reader) error {
Its totally fine to ignore in this case if you prefer though since you are always using the rdfChunker as a pointer it seems so it will have both sets of methods. It is just that there is a different set of methods available on an rdChunker
in value vs pointer form currently.
buf.nqCh <- buf.nquads | ||
buf.nquads = nil | ||
} | ||
close(buf.nqCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be worth adding a comment that after Flush()
is called it or Push()
cannot be called again since this channel is being closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
dgraph/cmd/live/run.go
Outdated
// containing opt.batchSize entries) and sends the batches to the loader.reqs channel (see | ||
// above). | ||
if oerr := ck.Parse(chunkBuf); oerr != nil && oerr != io.EOF { | ||
return oerr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth wrapping this error to added more context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if err != nil { | ||
return mr, err | ||
} | ||
|
||
// Add the connecting edge beteween the entities. | ||
nq.ObjectId = cr.uid | ||
nq.Facets = cr.fcts | ||
mr.nquads = append(mr.nquads, &nq) | ||
// Add the nquads that we got for the connecting entity. | ||
mr.nquads = append(mr.nquads, cr.nquads...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these other cr.nquads
from the connecting entity no longer needed here or below on line 382?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of the parent collecting the NQuads from the child, the child invocation of this func now pushes directly to the NQuadBuffer object.
…Parse no longer returns it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah.. I see what you mean. Done.
Reviewable status: 7 of 19 files reviewed, 15 unresolved discussions (waiting on @gitlw, @golangcibot, @mangalaman93, and @martinmr)
The current JSON chunker/parser was creating a slice of NQuads for each JSON map. This resulted in creating >5M NQuads in a dataset we were working with, causing bulk loader to go OOM. This PR fixes that by introducing an NQuadBuffer, which creates a slice of batchSize, and shoots it over a channel. The caller can then continuously consume from that channel. This avoids creating a slice of >5M entries and avoid very high memory usage.
Also refactored the code to bring both JSON and RDF parsers to the root level of chunker package to allow better code sharing with the new buffer.
This change is