-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce memory consumption in bulk loader #3724
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 4 files at r1, 1 of 1 files at r2.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @gitlw)
dgraph/cmd/bulk/mapper.go, line 55 at r2 (raw file):
// write them to file. entries []*pb.MapEntry encodedSize uint64
No need to track this.
dgraph/cmd/bulk/mapper.go, line 88 at r2 (raw file):
}) buf := make([]byte, encodedSize)
Use bytes.Buffer with gzip writer. And gzip the entries before writing to files. That way we can save space on disk.
dgraph/cmd/bulk/mapper.go, line 108 at r2 (raw file):
) x.Check(os.MkdirAll(filepath.Dir(filename), 0755)) x.Check(x.WriteFileSync(filename, buf, 0644))
You could push the map entries into a sync.Pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @gitlw)
dgraph/cmd/bulk/mapper.go, line 97 at r2 (raw file):
offset += n } // enlarge buf to include all the data
how does this enlarge buf? offset is <= len(buf)
Also, uppercase and period in the comment.
dgraph/cmd/bulk/mapper.go, line 143 at r2 (raw file):
sh.mu.Lock() // One write at a time. go m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i) // clear the entries and encodedSize for the next batch
minor: uppercase and period
dgraph/cmd/bulk/mapper.go, line 144 at r2 (raw file):
go m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i) // clear the entries and encodedSize for the next batch sh.entries = make([]*pb.MapEntry, 0, 10)
maybe add a comment on what the 10 is doing here.
Properly flushing the intermediate files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 3 of 6 files reviewed, 6 unresolved discussions (waiting on @golangcibot, @manishrjain, and @martinmr)
dgraph/cmd/bulk/mapper.go, line 89 at r1 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
S1019: should use make([]byte, encodedSize) instead (from
gosimple
)
Done.
dgraph/cmd/bulk/mapper.go, line 55 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
No need to track this.
After taking another look at the code, I find that there is an option in bulk loader
--mapoutput_mb int The estimated size of each map file output. Increasing this increases memory usage. (default 64)
To decide when to output to a file, it seems we still need to keep track of the estimated size of entries.
dgraph/cmd/bulk/mapper.go, line 88 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Use bytes.Buffer with gzip writer. And gzip the entries before writing to files. That way we can save space on disk.
Done.
dgraph/cmd/bulk/mapper.go, line 97 at r2 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
how does this enlarge buf? offset is <= len(buf)
Also, uppercase and period in the comment.
This part of the code has been rewritten, so please take another look at the new code with gzip.
dgraph/cmd/bulk/mapper.go, line 108 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
You could push the map entries into a sync.Pool.
I tested the approach with sync.Pool, it does not reduce peak memory consumption, but instead increased it a little bit from ~11GB to ~12GB. Maybe it's because the requesting rate is higher than the releasing rate, and the sync.Pool couldn't help in such cases.
Thus I didn't include the change of using sync.Pool.
dgraph/cmd/bulk/mapper.go, line 144 at r2 (raw file):
Previously, martinmr (Martin Martinez Rivera) wrote…
maybe add a comment on what the 10 is doing here.
Done.
By compressing the intermediate mapper out files into gzipped files, we are able to significantly reduce the disk space consumption. When bulk loader is run using the 21million data set, the disk space consumption is reduced from ~3GB to ~500MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got a few comments. Address before merging.
Reviewed 3 of 3 files at r3.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @gitlw, @golangcibot, @manishrjain, and @martinmr)
dgraph/cmd/bulk/mapper.go, line 144 at r2 (raw file):
Previously, gitlw (Lucas Wang) wrote…
Done.
16 or 32 or 64.
dgraph/cmd/bulk/mapper.go, line 95 at r3 (raw file):
} func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint64, shardIdx int) {
No need for encodedSize.
dgraph/cmd/bulk/mapper.go, line 116 at r3 (raw file):
}() w := bufio.NewWriter(gzWriter)
Combine these two defers and assignments.
dgraph/cmd/bulk/mapper.go, line 200 at r3 (raw file):
var err error sh.entries = append(sh.entries, me) sh.encodedSize += binary.MaxVarintLen64 + uint64(me.Size())
Remove the MaxVarintLen64. See if this would give you the same number of files as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 5 of 6 files reviewed, 8 unresolved discussions (waiting on @golangcibot, @manishrjain, and @martinmr)
dgraph/cmd/bulk/mapper.go, line 95 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
No need for encodedSize.
Done.
dgraph/cmd/bulk/mapper.go, line 116 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Combine these two defers and assignments.
Done.
dgraph/cmd/bulk/mapper.go, line 200 at r3 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Remove the MaxVarintLen64. See if this would give you the same number of files as before.
After removing the MaxVarintLen64, I'm getting the same number of files as before.
Sometimes bulk loader runs into OOM, and gets killed by the OS on my local machine, which has 32G of RAM.
I ran the pprof profiling, and it turns out most of the allocations happened inside the x.ReserveCap function, which allocates at least 2 times of an existing slice's current capacity.
While trying to avoid calling the x.ReserveCap function, I also find the map entries (pb.MapEntry) go through the following steps
This PR tries to avoid the double marshaling by initially storing map entries in each shard, and only marshal them once after they are sorted inside the writeMapEntriesToFile.
Evaluation:
The bulk load test using the 21 million data set is still passing by running
cd systest/21million
./test-21million.sh
After this change, the peak memory usage indicated by go_memstats_heap_inuse_bytes metric is reduced to 11.82GB vs 15.73GB before this change.
PS: the stats on the left side are based on running bulk loader after this change, and the stats on the right side are based on running bulk loader before this change.
This change is