diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index f7c2465c5da..4e21a644d41 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -50,7 +50,7 @@ type options struct { ReplaceOutDir bool TmpDir string NumGoroutines int - MapBufSize int64 + MapBufSize uint64 SkipMapPhase bool CleanupTmp bool NumReducers int diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 04df8b76067..6a5c0a74c48 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -17,7 +17,9 @@ package bulk import ( + "bufio" "bytes" + "compress/gzip" "encoding/binary" "fmt" "io" @@ -41,7 +43,6 @@ import ( "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/x" farm "github.com/dgryski/go-farm" - "github.com/gogo/protobuf/proto" ) type mapper struct { @@ -52,8 +53,9 @@ type mapper struct { type shardState struct { // Buffer up map entries until we have a sufficient amount, then sort and // write them to file. - entriesBuf []byte - mu sync.Mutex // Allow only 1 write per shard at a time. + entries []*pb.MapEntry + encodedSize uint64 + mu sync.Mutex // Allow only 1 write per shard at a time. } func newMapper(st *state) *mapper { @@ -78,44 +80,52 @@ func less(lhs, rhs *pb.MapEntry) bool { return lhsUID < rhsUID } -func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { - defer m.shards[shardIdx].mu.Unlock() // Locked by caller. +func (m *mapper) openOutputFile(shardIdx int) (*os.File, error) { + fileNum := atomic.AddUint32(&m.mapFileId, 1) + filename := filepath.Join( + m.opt.TmpDir, + "shards", + fmt.Sprintf("%03d", shardIdx), + fmt.Sprintf("%06d.map.gz", fileNum), + ) + x.Check(os.MkdirAll(filepath.Dir(filename), 0755)) + return os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) +} - buf := entriesBuf - var entries []*pb.MapEntry - for len(buf) > 0 { - sz, n := binary.Uvarint(buf) - x.AssertTrue(n > 0) - buf = buf[n:] - me := new(pb.MapEntry) - x.Check(proto.Unmarshal(buf[:sz], me)) - buf = buf[sz:] - entries = append(entries, me) - } +func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint64, shardIdx int) { + defer m.shards[shardIdx].mu.Unlock() // Locked by caller. sort.Slice(entries, func(i, j int) bool { return less(entries[i], entries[j]) }) - buf = entriesBuf + f, err := m.openOutputFile(shardIdx) + x.Check(err) + + defer func() { + x.Check(f.Sync()) + x.Check(f.Close()) + }() + + gzWriter := gzip.NewWriter(f) + w := bufio.NewWriter(gzWriter) + defer func() { + x.Check(w.Flush()) + x.Check(gzWriter.Flush()) + x.Check(gzWriter.Close()) + }() + + sizeBuf := make([]byte, binary.MaxVarintLen64) for _, me := range entries { - n := binary.PutUvarint(buf, uint64(me.Size())) - buf = buf[n:] - n, err := me.MarshalTo(buf) + n := binary.PutUvarint(sizeBuf, uint64(me.Size())) + _, err := w.Write(sizeBuf[:n]) x.Check(err) - buf = buf[n:] - } - x.AssertTrue(len(buf) == 0) - fileNum := atomic.AddUint32(&m.mapFileId, 1) - filename := filepath.Join( - m.opt.TmpDir, - "shards", - fmt.Sprintf("%03d", shardIdx), - fmt.Sprintf("%06d.map", fileNum), - ) - x.Check(os.MkdirAll(filepath.Dir(filename), 0755)) - x.Check(x.WriteFileSync(filename, entriesBuf, 0644)) + meBuf, err := me.Marshal() + x.Check(err) + _, err = w.Write(meBuf) + x.Check(err) + } } func (m *mapper) run(inputFormat chunker.InputFormat) { @@ -147,10 +157,13 @@ func (m *mapper) run(inputFormat chunker.InputFormat) { for i := range m.shards { sh := &m.shards[i] - if len(sh.entriesBuf) >= int(m.opt.MapBufSize) { + if sh.encodedSize >= m.opt.MapBufSize { sh.mu.Lock() // One write at a time. - go m.writeMapEntriesToFile(sh.entriesBuf, i) - sh.entriesBuf = make([]byte, 0, m.opt.MapBufSize*11/10) + go m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i) + // Clear the entries and encodedSize for the next batch. + // Proactively allocate 32 slots to bootstrap the entries slice. + sh.entries = make([]*pb.MapEntry, 0, 32) + sh.encodedSize = 0 } } } @@ -158,9 +171,9 @@ func (m *mapper) run(inputFormat chunker.InputFormat) { for i := range m.shards { sh := &m.shards[i] - if len(sh.entriesBuf) > 0 { + if len(sh.entries) > 0 { sh.mu.Lock() // One write at a time. - m.writeMapEntriesToFile(sh.entriesBuf, i) + m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i) } m.shards[i].mu.Lock() // Ensure that the last file write finishes. } @@ -180,8 +193,8 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) { sh := &m.shards[shard] var err error - sh.entriesBuf = x.AppendUvarint(sh.entriesBuf, uint64(me.Size())) - sh.entriesBuf, err = x.AppendProtoMsg(sh.entriesBuf, me) + sh.entries = append(sh.entries, me) + sh.encodedSize += uint64(me.Size()) x.Check(err) } diff --git a/dgraph/cmd/bulk/merge_shards.go b/dgraph/cmd/bulk/merge_shards.go index bc47f4d46f5..854127ef5fe 100644 --- a/dgraph/cmd/bulk/merge_shards.go +++ b/dgraph/cmd/bulk/merge_shards.go @@ -68,7 +68,7 @@ func filenamesInTree(dir string) []string { if err != nil { return err } - if strings.HasSuffix(path, ".map") { + if strings.HasSuffix(path, ".gz") { fnames = append(fnames, path) } return nil diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 283bd1ebce7..19441347424 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -19,6 +19,7 @@ package bulk import ( "bufio" "bytes" + "compress/gzip" "container/heap" "encoding/binary" "fmt" @@ -131,8 +132,10 @@ func (mi *mapIterator) Next() *pb.MapEntry { func newMapIterator(filename string) *mapIterator { fd, err := os.Open(filename) x.Check(err) + gzReader, err := gzip.NewReader(fd) + x.Check(err) - return &mapIterator{fd: fd, reader: bufio.NewReaderSize(fd, 16<<10)} + return &mapIterator{fd: fd, reader: bufio.NewReaderSize(gzReader, 16<<10)} } func (r *reducer) encodeAndWrite( diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 148754a98d4..cc7d8e2c059 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -105,7 +105,7 @@ func run() { ReplaceOutDir: Bulk.Conf.GetBool("replace_out"), TmpDir: Bulk.Conf.GetString("tmp"), NumGoroutines: Bulk.Conf.GetInt("num_go_routines"), - MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")), + MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")), SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"), CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"), NumReducers: Bulk.Conf.GetInt("reducers"), diff --git a/x/proto.go b/x/proto.go deleted file mode 100644 index 56a6685a439..00000000000 --- a/x/proto.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package x - -import ( - "encoding/binary" -) - -// ProtoMessage is an interface to interact with protobuf messages. -type ProtoMessage interface { - Size() int - MarshalTo([]byte) (int, error) -} - -// AppendProtoMsg appends the given protobuf message to the given byte slice. -func AppendProtoMsg(p []byte, msg ProtoMessage) ([]byte, error) { - sz := msg.Size() - p = ReserveCap(p, len(p)+sz) - buf := p[len(p) : len(p)+sz] - n, err := msg.MarshalTo(buf) - AssertTrue(sz == n) - return p[:len(p)+sz], err -} - -// AppendUvarint appends the given uint64 to the given byte slice. -func AppendUvarint(p []byte, x uint64) []byte { - p = ReserveCap(p, len(p)+binary.MaxVarintLen64) - buf := p[len(p) : len(p)+binary.MaxVarintLen64] - n := binary.PutUvarint(buf, x) - return p[:len(p)+n] -} - -// ReserveCap returns a new byte slice containing the contents of the given one with -// a capacity of atLeast. The original byte slice is returned if it meets the capacity -// requirements. -func ReserveCap(p []byte, atLeast int) []byte { - if cap(p) >= atLeast { - return p - } - newCap := cap(p) * 2 - if newCap < atLeast { - newCap = atLeast - } - newP := make([]byte, len(p), newCap) - copy(newP, p) - return newP -}