Skip to content

Commit

Permalink
live loading from stdin (#3266)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas Wang authored Apr 9, 2019
1 parent 4240bdc commit 3aa2408
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 28 deletions.
13 changes: 12 additions & 1 deletion chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (rdfChunker) Begin(r *bufio.Reader) error {
return nil
}

// Chunk reads the input line by line until one of the following 3 conditions happens
// 1) the EOF is reached
// 2) 1e5 lines have been read
// 3) some unexpected error happened
func (rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(1 << 20)
Expand Down Expand Up @@ -273,7 +277,14 @@ func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
// and decompressed automatically even without the gz extension. The caller is responsible for
// calling the returned cleanup function when done with the reader.
func FileReader(file string) (rd *bufio.Reader, cleanup func()) {
f, err := os.Open(file)
var f *os.File
var err error
if file == "-" {
f = os.Stdin
} else {
f, err = os.Open(file)
}

x.Check(err)

cleanup = func() { f.Close() }
Expand Down
64 changes: 38 additions & 26 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,41 +199,16 @@ func (l *loader) processFile(ctx context.Context, filename string) error {
func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker) error {
x.CheckfNoTrace(ck.Begin(rd))

batch := make([]*api.NQuad, 0, 2*opt.batchSize)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

var nqs []*api.NQuad
chunkBuf, err := ck.Chunk(rd)
if chunkBuf != nil && chunkBuf.Len() > 0 {
nqs, err = ck.Parse(chunkBuf)
x.CheckfNoTrace(err)

for _, nq := range nqs {
nq.Subject = l.uid(nq.Subject)
if len(nq.ObjectId) > 0 {
nq.ObjectId = l.uid(nq.ObjectId)
}
}

batch = append(batch, nqs...)
for len(batch) >= opt.batchSize {
mu := api.Mutation{Set: batch[:opt.batchSize]}
l.reqs <- mu
// The following would create a new batch slice. We should not use batch =
// batch[opt.batchSize:], because it would end up modifying the batch array passed
// to l.reqs above.
batch = append([]*api.NQuad{}, batch[opt.batchSize:]...)
}
}
l.processChunk(chunkBuf, ck)
if err == io.EOF {
if len(batch) > 0 {
l.reqs <- api.Mutation{Set: batch}
}
break
} else {
x.Check(err)
Expand All @@ -244,6 +219,43 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
return nil
}

// processChunk parses the rdf entries from the chunk, and group them into
// batches (each one containing opt.batchSize entries) and sends the batches
// to the loader.reqs channel
func (l *loader) processChunk(chunkBuf *bytes.Buffer, ck chunker.Chunker) {
if chunkBuf == nil && chunkBuf.Len() == 0 {
return
}

nqs, err := ck.Parse(chunkBuf)
x.CheckfNoTrace(err)

batch := make([]*api.NQuad, 0, opt.batchSize)
for _, nq := range nqs {
nq.Subject = l.uid(nq.Subject)
if len(nq.ObjectId) > 0 {
nq.ObjectId = l.uid(nq.ObjectId)
}

batch = append(batch, nq)

if len(batch) >= opt.batchSize {
mu := api.Mutation{Set: batch}
l.reqs <- mu

// The following would create a new batch slice. We should not use batch =
// batch[:0], because it would end up modifying the batch array passed
// to l.reqs above.
batch = make([]*api.NQuad, 0, opt.batchSize)
}
}

// sends the left over nqs
if len(batch) > 0 {
l.reqs <- api.Mutation{Set: batch}
}
}

func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader {
var db *badger.DB
if len(opt.clientDir) > 0 {
Expand Down
5 changes: 4 additions & 1 deletion x/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func FindDataFiles(str string, ext []string) []string {
}

list := strings.Split(str, ",")
if len(list) == 1 {
if len(list) == 1 && list[0] != "-" {
// make sure the file or directory exists,
// and recursively search for files if it's a directory

fi, err := os.Stat(str)
if os.IsNotExist(err) {
glog.Errorf("File or directory does not exist: %s", str)
Expand Down

0 comments on commit 3aa2408

Please sign in to comment.