Skip to content

Commit

Permalink
Reduce memory consumption in bulk loader (#3724)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas Wang authored Jul 31, 2019
1 parent 857e30f commit 31b96c2
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 104 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type options struct {
ReplaceOutDir bool
TmpDir string
NumGoroutines int
MapBufSize int64
MapBufSize uint64
SkipMapPhase bool
CleanupTmp bool
NumReducers int
Expand Down
91 changes: 52 additions & 39 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package bulk

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
Expand All @@ -41,7 +43,6 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
"github.com/gogo/protobuf/proto"
)

type mapper struct {
Expand All @@ -52,8 +53,9 @@ type mapper struct {
type shardState struct {
// Buffer up map entries until we have a sufficient amount, then sort and
// write them to file.
entriesBuf []byte
mu sync.Mutex // Allow only 1 write per shard at a time.
entries []*pb.MapEntry
encodedSize uint64
mu sync.Mutex // Allow only 1 write per shard at a time.
}

func newMapper(st *state) *mapper {
Expand All @@ -78,44 +80,52 @@ func less(lhs, rhs *pb.MapEntry) bool {
return lhsUID < rhsUID
}

func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.
func (m *mapper) openOutputFile(shardIdx int) (*os.File, error) {
fileNum := atomic.AddUint32(&m.mapFileId, 1)
filename := filepath.Join(
m.opt.TmpDir,
"shards",
fmt.Sprintf("%03d", shardIdx),
fmt.Sprintf("%06d.map.gz", fileNum),
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
return os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
}

buf := entriesBuf
var entries []*pb.MapEntry
for len(buf) > 0 {
sz, n := binary.Uvarint(buf)
x.AssertTrue(n > 0)
buf = buf[n:]
me := new(pb.MapEntry)
x.Check(proto.Unmarshal(buf[:sz], me))
buf = buf[sz:]
entries = append(entries, me)
}
func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint64, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.

sort.Slice(entries, func(i, j int) bool {
return less(entries[i], entries[j])
})

buf = entriesBuf
f, err := m.openOutputFile(shardIdx)
x.Check(err)

defer func() {
x.Check(f.Sync())
x.Check(f.Close())
}()

gzWriter := gzip.NewWriter(f)
w := bufio.NewWriter(gzWriter)
defer func() {
x.Check(w.Flush())
x.Check(gzWriter.Flush())
x.Check(gzWriter.Close())
}()

sizeBuf := make([]byte, binary.MaxVarintLen64)
for _, me := range entries {
n := binary.PutUvarint(buf, uint64(me.Size()))
buf = buf[n:]
n, err := me.MarshalTo(buf)
n := binary.PutUvarint(sizeBuf, uint64(me.Size()))
_, err := w.Write(sizeBuf[:n])
x.Check(err)
buf = buf[n:]
}
x.AssertTrue(len(buf) == 0)

fileNum := atomic.AddUint32(&m.mapFileId, 1)
filename := filepath.Join(
m.opt.TmpDir,
"shards",
fmt.Sprintf("%03d", shardIdx),
fmt.Sprintf("%06d.map", fileNum),
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
meBuf, err := me.Marshal()
x.Check(err)
_, err = w.Write(meBuf)
x.Check(err)
}
}

func (m *mapper) run(inputFormat chunker.InputFormat) {
Expand Down Expand Up @@ -147,20 +157,23 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {

for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) >= int(m.opt.MapBufSize) {
if sh.encodedSize >= m.opt.MapBufSize {
sh.mu.Lock() // One write at a time.
go m.writeMapEntriesToFile(sh.entriesBuf, i)
sh.entriesBuf = make([]byte, 0, m.opt.MapBufSize*11/10)
go m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i)
// Clear the entries and encodedSize for the next batch.
// Proactively allocate 32 slots to bootstrap the entries slice.
sh.entries = make([]*pb.MapEntry, 0, 32)
sh.encodedSize = 0
}
}
}
}

for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) > 0 {
if len(sh.entries) > 0 {
sh.mu.Lock() // One write at a time.
m.writeMapEntriesToFile(sh.entriesBuf, i)
m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i)
}
m.shards[i].mu.Lock() // Ensure that the last file write finishes.
}
Expand All @@ -180,8 +193,8 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) {
sh := &m.shards[shard]

var err error
sh.entriesBuf = x.AppendUvarint(sh.entriesBuf, uint64(me.Size()))
sh.entriesBuf, err = x.AppendProtoMsg(sh.entriesBuf, me)
sh.entries = append(sh.entries, me)
sh.encodedSize += uint64(me.Size())
x.Check(err)
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func filenamesInTree(dir string) []string {
if err != nil {
return err
}
if strings.HasSuffix(path, ".map") {
if strings.HasSuffix(path, ".gz") {
fnames = append(fnames, path)
}
return nil
Expand Down
5 changes: 4 additions & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package bulk
import (
"bufio"
"bytes"
"compress/gzip"
"container/heap"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -131,8 +132,10 @@ func (mi *mapIterator) Next() *pb.MapEntry {
func newMapIterator(filename string) *mapIterator {
fd, err := os.Open(filename)
x.Check(err)
gzReader, err := gzip.NewReader(fd)
x.Check(err)

return &mapIterator{fd: fd, reader: bufio.NewReaderSize(fd, 16<<10)}
return &mapIterator{fd: fd, reader: bufio.NewReaderSize(gzReader, 16<<10)}
}

func (r *reducer) encodeAndWrite(
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func run() {
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Expand Down
61 changes: 0 additions & 61 deletions x/proto.go

This file was deleted.

0 comments on commit 31b96c2

Please sign in to comment.