Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce StreamDone in bulk loader #4297

Merged
merged 7 commits into from
Nov 28, 2019
Merged

Conversation

ashish-goswami
Copy link
Contributor

@ashish-goswami ashish-goswami commented Nov 20, 2019

Depends on #4200

During reduce phase, we insert predicate in sorted order to Badger instances. Each predicate has one underlying stream in stream writer and each stream has one underlying table builder. Each table builder has at least 1MB of buffer with it. Previously we had no way to close a stream(hence underlying table builder was in memory until flush is called on steam writer). With stream done support in BadgerV2.0, we can closed stream once we are done writing a predicate.

This change can help us reduce memory usage when we have large number of predicates.
I did benchmarking by creating RDF file using below program:

package main

import (
	"fmt"
	"os"
)

var (
	pc = 10000
	rc = 10000
)

func main() {
  filename := fmt.Sprintf("file-%d-%d", pc, rc)
	f, err := os.Create(filename)
	if err != nil {
		fmt.Println("unable to open file ", err)
		return
	}
	for i := 0; i < pc; i++ {
		for j := 0; j < rc; j++ {
			a := fmt.Sprintf("_:id%d <pred%d> \"val\" .", i, j)
			f.Write([]byte(a))
			f.Write([]byte("\n"))
		}
	}
	f.Sync()
	f.Close()
}

Above program creates a RDF file with 10000 predicates and each predicate has 10000 records for it(Total 100M records). I ran bulk loader using below command with empty schema.

dgraph_master bulk -f file-10000-10000.rdf.gz -s empty.schema

System configuration: 16 core, 64GB RAM

Here are my observations:

Current Master:
Map phase peak memory usage(RSS): ~6GB
Reduce phase peak memory usage(RSS): ~20GB
Time to complete: 4min31sec

This PR:
Map phase peak memory usage(RSS): ~6GB
Reduce phase peak memory usage(RSS): ~7.2GB
Time to complete: 4min16sec


This change is Reviewable

Copy link
Contributor

@animesh2049 animesh2049 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 53 files reviewed, 2 unresolved discussions (waiting on @animesh2049, @ashish-goswami, @mangalaman93, @manishrjain, and @martinmr)

a discussion (no related file):
:lgtm:



dgraph/cmd/bulk/reduce.go, line 168 at r1 (raw file):

	}

	addDone := func(l *bpb.KVList, doneSteams []uint32) *bpb.KVList {

Any specific reason for having a return value? this function can silently modify l and return.

Copy link
Contributor

@martinmr martinmr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: but I'll let somebody else give final approval.

Reviewed 53 of 53 files at r1.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @animesh2049, @ashish-goswami, @mangalaman93, and @manishrjain)


dgraph/cmd/bulk/reduce.go, line 168 at r1 (raw file):

Previously, animesh2049 (Animesh Chandra Pathak) wrote…

Any specific reason for having a return value? this function can silently modify l and return.

Agree. Also, note that if you make the change, it's usually a convention to have l be the last argument (since it's kind of acting as an output).

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 52 of 53 files reviewed, 2 unresolved discussions (waiting on @animesh2049, @ashish-goswami, @mangalaman93, @manishrjain, and @martinmr)


dgraph/cmd/bulk/reduce.go, line 168 at r1 (raw file):

Previously, martinmr (Martin Martinez Rivera) wrote…

Agree. Also, note that if you make the change, it's usually a convention to have l be the last argument (since it's kind of acting as an output).

Done.

Copy link
Contributor

@mangalaman93 mangalaman93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 3 unresolved discussions (waiting on @animesh2049, @ashish-goswami, @manishrjain, and @martinmr)


dgraph/cmd/bulk/reduce.go, line 168 at r2 (raw file):

	// Once we have processed all records from single stream, we can mark that stream as done.
	// This will close underlying table builder in Badger for stream. Since we preallocate 1 MB
	// of memory for each table builder, this can result in memory save in case we have large

saving*

Copy link
Contributor

@mangalaman93 mangalaman93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 9 files at r2.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @animesh2049, @ashish-goswami, and @manishrjain)

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 3 unresolved discussions (waiting on @animesh2049, @ashish-goswami, @mangalaman93, and @manishrjain)


dgraph/cmd/bulk/reduce.go, line 168 at r2 (raw file):

Previously, mangalaman93 (Aman Mangal) wrote…

saving*

fixed

Copy link
Contributor

@manishrjain manishrjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 1 files at r3.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @animesh2049, @ashish-goswami, and @mangalaman93)


dgraph/cmd/bulk/reduce.go, line 173 at r3 (raw file):

	// write call. This can also be optimised if required.
	addDone := func(doneSteams []uint32, l *bpb.KVList) {
		for _, SID := range doneSteams {

streamId


dgraph/cmd/bulk/reduce.go, line 191 at r3 (raw file):

			for _, kv := range list.Kv {
				setStreamId(kv)
				updateDoneStreams(kv.StreamId)

inline the func logic.

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 5 unresolved discussions (waiting on @animesh2049, @ashish-goswami, @mangalaman93, and @manishrjain)


dgraph/cmd/bulk/reduce.go, line 173 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

streamId

Done.


dgraph/cmd/bulk/reduce.go, line 191 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

inline the func logic.

Done.

@ashish-goswami ashish-goswami merged commit ae679e6 into master Nov 28, 2019
@ashish-goswami ashish-goswami deleted the ashish/bulkloader branch November 28, 2019 11:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants