Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory consumption in bulk loader #3724

Merged
merged 13 commits into from
Jul 31, 2019
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type options struct {
ReplaceOutDir bool
TmpDir string
NumGoroutines int
MapBufSize int64
MapBufSize uint64
SkipMapPhase bool
CleanupTmp bool
NumReducers int
Expand Down
91 changes: 52 additions & 39 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package bulk

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
Expand All @@ -41,7 +43,6 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
"github.com/gogo/protobuf/proto"
)

type mapper struct {
Expand All @@ -52,8 +53,9 @@ type mapper struct {
type shardState struct {
// Buffer up map entries until we have a sufficient amount, then sort and
// write them to file.
entriesBuf []byte
mu sync.Mutex // Allow only 1 write per shard at a time.
entries []*pb.MapEntry
encodedSize uint64
mu sync.Mutex // Allow only 1 write per shard at a time.
}

func newMapper(st *state) *mapper {
Expand All @@ -78,44 +80,52 @@ func less(lhs, rhs *pb.MapEntry) bool {
return lhsUID < rhsUID
}

func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.
func (m *mapper) openOutputFile(shardIdx int) (*os.File, error) {
fileNum := atomic.AddUint32(&m.mapFileId, 1)
filename := filepath.Join(
m.opt.TmpDir,
"shards",
fmt.Sprintf("%03d", shardIdx),
fmt.Sprintf("%06d.map.gz", fileNum),
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
return os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
}

buf := entriesBuf
var entries []*pb.MapEntry
for len(buf) > 0 {
sz, n := binary.Uvarint(buf)
x.AssertTrue(n > 0)
buf = buf[n:]
me := new(pb.MapEntry)
x.Check(proto.Unmarshal(buf[:sz], me))
buf = buf[sz:]
entries = append(entries, me)
}
func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint64, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.

sort.Slice(entries, func(i, j int) bool {
return less(entries[i], entries[j])
})

buf = entriesBuf
f, err := m.openOutputFile(shardIdx)
x.Check(err)

defer func() {
x.Check(f.Sync())
x.Check(f.Close())
}()

gzWriter := gzip.NewWriter(f)
w := bufio.NewWriter(gzWriter)
defer func() {
x.Check(w.Flush())
x.Check(gzWriter.Flush())
x.Check(gzWriter.Close())
}()

sizeBuf := make([]byte, binary.MaxVarintLen64)
for _, me := range entries {
n := binary.PutUvarint(buf, uint64(me.Size()))
buf = buf[n:]
n, err := me.MarshalTo(buf)
n := binary.PutUvarint(sizeBuf, uint64(me.Size()))
_, err := w.Write(sizeBuf[:n])
x.Check(err)
buf = buf[n:]
}
x.AssertTrue(len(buf) == 0)

fileNum := atomic.AddUint32(&m.mapFileId, 1)
filename := filepath.Join(
m.opt.TmpDir,
"shards",
fmt.Sprintf("%03d", shardIdx),
fmt.Sprintf("%06d.map", fileNum),
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
meBuf, err := me.Marshal()
x.Check(err)
_, err = w.Write(meBuf)
x.Check(err)
}
}

func (m *mapper) run(inputFormat chunker.InputFormat) {
Expand Down Expand Up @@ -147,20 +157,23 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {

for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) >= int(m.opt.MapBufSize) {
if sh.encodedSize >= m.opt.MapBufSize {
sh.mu.Lock() // One write at a time.
go m.writeMapEntriesToFile(sh.entriesBuf, i)
sh.entriesBuf = make([]byte, 0, m.opt.MapBufSize*11/10)
go m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i)
// Clear the entries and encodedSize for the next batch.
// Proactively allocate 32 slots to bootstrap the entries slice.
sh.entries = make([]*pb.MapEntry, 0, 32)
sh.encodedSize = 0
}
}
}
}

for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) > 0 {
if len(sh.entries) > 0 {
sh.mu.Lock() // One write at a time.
m.writeMapEntriesToFile(sh.entriesBuf, i)
m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i)
}
m.shards[i].mu.Lock() // Ensure that the last file write finishes.
}
Expand All @@ -180,8 +193,8 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) {
sh := &m.shards[shard]

var err error
sh.entriesBuf = x.AppendUvarint(sh.entriesBuf, uint64(me.Size()))
sh.entriesBuf, err = x.AppendProtoMsg(sh.entriesBuf, me)
sh.entries = append(sh.entries, me)
sh.encodedSize += uint64(me.Size())
x.Check(err)
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func filenamesInTree(dir string) []string {
if err != nil {
return err
}
if strings.HasSuffix(path, ".map") {
if strings.HasSuffix(path, ".gz") {
fnames = append(fnames, path)
}
return nil
Expand Down
5 changes: 4 additions & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package bulk
import (
"bufio"
"bytes"
"compress/gzip"
"container/heap"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -131,8 +132,10 @@ func (mi *mapIterator) Next() *pb.MapEntry {
func newMapIterator(filename string) *mapIterator {
fd, err := os.Open(filename)
x.Check(err)
gzReader, err := gzip.NewReader(fd)
x.Check(err)

return &mapIterator{fd: fd, reader: bufio.NewReaderSize(fd, 16<<10)}
return &mapIterator{fd: fd, reader: bufio.NewReaderSize(gzReader, 16<<10)}
}

func (r *reducer) encodeAndWrite(
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func run() {
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Expand Down
61 changes: 0 additions & 61 deletions x/proto.go

This file was deleted.