Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize XidtoUID map used by live and bulk loader #2998

Merged
merged 11 commits into from
Feb 11, 2019
8 changes: 3 additions & 5 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func readSchema(filename string) []*pb.SchemaUpdate {
func (ld *loader) mapStage() {
ld.prog.setPhase(mapPhase)

// TODO: Consider if we need to always store the XIDs in Badger. Things slow down if we do.
xidDir := filepath.Join(ld.opt.TmpDir, "xids")
x.Check(os.Mkdir(xidDir, 0755))
opt := badger.DefaultOptions
Expand All @@ -157,10 +158,7 @@ func (ld *loader) mapStage() {
var err error
ld.xidDB, err = badger.Open(opt)
x.Check(err)
ld.xids = xidmap.New(ld.xidDB, ld.zero, xidmap.Options{
NumShards: 1 << 10,
LRUSize: 1 << 19,
})
ld.xids = xidmap.New(ld.zero, ld.xidDB)

var dir, ext string
var loaderType int
Expand Down Expand Up @@ -226,7 +224,7 @@ func (ld *loader) mapStage() {
for i := range ld.mappers {
ld.mappers[i] = nil
}
ld.xids.EvictAll()
x.Check(ld.xids.Flush())
x.Check(ld.xidDB.Close())
ld.xids = nil
runtime.GC()
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func (m *mapper) processNQuad(nq gql.NQuad) {
}

func (m *mapper) lookupUid(xid string) uint64 {
uid, isNew := m.xids.AssignUid(xid)
if !isNew || !m.opt.StoreXids {
uid := m.xids.AssignUid(xid)
if !m.opt.StoreXids {
return uid
}
if strings.HasPrefix(xid, "_:") {
Expand Down
11 changes: 7 additions & 4 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type loader struct {
dc *dgo.Dgraph
alloc *xidmap.XidMap
ticker *time.Ticker
kv *badger.DB
db *badger.DB
requestsWg sync.WaitGroup
// If we retry a request, we add one to retryRequestsWg.
retryRequestsWg sync.WaitGroup
Expand Down Expand Up @@ -188,15 +188,18 @@ func (l *loader) makeRequests() {
}

func (l *loader) printCounters() {
l.ticker = time.NewTicker(2 * time.Second)
period := 5 * time.Second
l.ticker = time.NewTicker(period)
start := time.Now()

var last Counter
for range l.ticker.C {
counter := l.Counter()
rate := float64(counter.Nquads) / counter.Elapsed.Seconds()
rate := float64(counter.Nquads-last.Nquads) / period.Seconds()
elapsed := time.Since(start).Round(time.Second)
fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/sec: %5.0f Aborts: %d\n",
fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n",
elapsed, counter.TxnsDone, counter.Nquads, rate, counter.Aborts)
last = counter
}
}

Expand Down
54 changes: 22 additions & 32 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,12 @@ func (l *loader) uid(val string) string {
// to be an existing node in the graph. There is limited protection against
// a user selecting an unassigned UID in this way - it may be assigned
// later to another node. It is up to the user to avoid this.
if strings.HasPrefix(val, "0x") {
if _, err := strconv.ParseUint(val[2:], 16, 64); err == nil {
return val
}
if uid, err := strconv.ParseUint(val, 0, 64); err == nil {
l.alloc.BumpTo(uid)
return fmt.Sprintf("%#x", uid)
}

uid, _ := l.alloc.AssignUid(val)
uid := l.alloc.AssignUid(val)
return fmt.Sprintf("%#x", uint64(uid))
}

Expand Down Expand Up @@ -241,36 +240,32 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
}

func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader {
x.Check(os.MkdirAll(opt.clientDir, 0700))
o := badger.DefaultOptions
o.SyncWrites = true // So that checkpoints are persisted immediately.
o.TableLoadingMode = bopt.MemoryMap
o.Dir = opt.clientDir
o.ValueDir = opt.clientDir
var db *badger.DB
if len(opt.clientDir) > 0 {
x.Check(os.MkdirAll(opt.clientDir, 0700))
o := badger.DefaultOptions
o.Dir = opt.clientDir
o.ValueDir = opt.clientDir
o.TableLoadingMode = bopt.MemoryMap
o.SyncWrites = false

kv, err := badger.Open(o)
x.Checkf(err, "Error while creating badger KV posting store")
var err error
db, err = badger.Open(o)
x.Checkf(err, "Error while creating badger KV posting store")
}

// compression with zero server actually makes things worse
connzero, err := x.SetupConnection(opt.zero, &tlsConf, false)
x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.zero)

alloc := xidmap.New(
kv,
connzero,
xidmap.Options{
NumShards: 100,
LRUSize: 1e5,
},
)

alloc := xidmap.New(connzero, db)
l := &loader{
opts: opts,
dc: dc,
start: time.Now(),
reqs: make(chan api.Mutation, opts.Pending*2),
alloc: alloc,
kv: kv,
db: db,
zeroconn: connzero,
}

Expand Down Expand Up @@ -322,17 +317,8 @@ func run() error {
}
dgraphClient := dgo.NewDgraphClient(clients...)

if len(opt.clientDir) == 0 {
var err error
opt.clientDir, err = ioutil.TempDir("", "x")
x.Checkf(err, "Error while trying to create temporary client directory.")
fmt.Printf("Creating temp client directory at %s\n", opt.clientDir)
defer os.RemoveAll(opt.clientDir)
}
l := setup(bmOpts, dgraphClient)
defer l.zeroconn.Close()
defer l.kv.Close()
defer l.alloc.EvictAll()

if len(opt.schemaFile) > 0 {
if err := processSchemaFile(ctx, opt.schemaFile, dgraphClient); err != nil {
Expand Down Expand Up @@ -397,5 +383,9 @@ func run() error {
fmt.Printf("Time spent : %v\n", c.Elapsed)
fmt.Printf("N-Quads processed per second : %d\n", rate)

if l.db != nil {
l.alloc.Flush()
l.db.Close()
}
return nil
}
2 changes: 1 addition & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error {
return x.Errorf("Input for predicate %s of type uid is scalar", edge.Attr)

case schemaType.IsScalar() && !storageType.IsScalar():
return x.Errorf("Input for predicate %s of type scalar is uid", edge.Attr)
return x.Errorf("Input for predicate %s of type scalar is uid. Edge: %v", edge.Attr, edge)

// The suggested storage type matches the schema, OK!
case storageType == schemaType && schemaType != types.DefaultID:
Expand Down
2 changes: 1 addition & 1 deletion x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func SetupConnection(host string, tlsConf *TLSHelperConfig, useGz bool) (*grpc.C
grpc.WithBlock(),
grpc.WithTimeout(10*time.Second))

if tlsConf.CertRequired {
if tlsConf != nil && tlsConf.CertRequired {
tlsConf.ConfigType = TLSClientConfig
tlsCfg, _, err := GenerateTLSConfig(*tlsConf)
if err != nil {
Expand Down
Loading