Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved latency in live loader using conflict resolution at client level #4362

Merged
merged 31 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
db3e10b
Handing @reverse seperatly for improving latencies in live loader
Dec 4, 2019
41feecc
Added logic for finding conflicts before hand in live loader
Dec 5, 2019
b1adada
removed reversereqs
Dec 5, 2019
8654f01
fixed a bug
Dec 5, 2019
4927379
fixed a few comments
Dec 9, 2019
073976c
Changed for loop to do while loop
Dec 9, 2019
42357f4
Merge branch 'master' of github.com:dgraph-io/dgraph into harshil-goe…
Dec 9, 2019
3e57db3
Merge branch 'master' of github.com:dgraph-io/dgraph into harshil-goe…
Dec 9, 2019
6cc0cbd
Fixes few issues.
Dec 9, 2019
740f766
fixed a bug
Dec 9, 2019
05251bc
Removed extra files
Dec 9, 2019
42e8b12
added conflict keys
Dec 10, 2019
f1315ba
fixed a bug
Dec 10, 2019
14c2b5c
Added getSchema from alpha
Dec 10, 2019
d9cf23a
Added a few more rules
Dec 10, 2019
b741591
Fixed one more test
Dec 10, 2019
9309a7d
Buffer in makeRequests
Dec 11, 2019
ea477aa
Merge branch 'harshil-goel/remove-conflict-uids' of github.com:dgraph…
Dec 11, 2019
8f95f29
Some key
Dec 12, 2019
015b06d
Remove posting pool and minor performance improvement.
Dec 12, 2019
9b4af50
Removed a bug
Dec 12, 2019
7bb5b75
Fixed comments
Dec 12, 2019
8db0871
Added a test
Dec 16, 2019
e78ef0e
Little refactor
Dec 16, 2019
4bd370b
Refactored code
Dec 16, 2019
420ccaf
Minor refactor
Dec 17, 2019
6557693
Refactored the code
Dec 18, 2019
86a6533
Minor refactor
Dec 18, 2019
d835352
fixed typo
Dec 18, 2019
23c7db6
Merge branch 'master' into harshil-goel/remove-conflict-uids
Dec 18, 2019
9a1831c
Merge branch 'master' into harshil-goel/remove-conflict-uids
Dec 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions dgraph/cmd/live/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
"github.com/dgryski/go-farm"
"github.com/dustin/go-humanize/english"
)

Expand All @@ -46,6 +48,7 @@ type batchMutationOptions struct {
Pending int
PrintCounters bool
MaxRetries uint32
bufferSize int
// User could pass a context so that we can stop retrying requests once context is done
Ctx context.Context
}
Expand Down Expand Up @@ -73,6 +76,9 @@ type loader struct {
// To get time elapsed
start time.Time

currentUIDS map[uint64]bool
uidsLock sync.RWMutex

reqNum uint64
reqs chan api.Mutation
zeroconn *grpc.ClientConn
Expand Down Expand Up @@ -131,6 +137,7 @@ func (l *loader) infinitelyRetry(req api.Mutation, reqNum uint64) {
}
atomic.AddUint64(&l.nquads, uint64(len(req.Set)))
atomic.AddUint64(&l.txns, 1)
l.removeMap(&req)
return
}
nretries++
Expand All @@ -151,6 +158,7 @@ func (l *loader) request(req api.Mutation, reqNum uint64) {
if err == nil {
atomic.AddUint64(&l.nquads, uint64(len(req.Set)))
atomic.AddUint64(&l.txns, 1)
l.removeMap(&req)
return
}
handleError(err, reqNum, false)
Expand All @@ -159,12 +167,99 @@ func (l *loader) request(req api.Mutation, reqNum uint64) {
go l.infinitelyRetry(req, reqNum)
}

func (l *loader) print(req api.Mutation) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (*loader).print is unused (from unused)

fmt.Println("======================")
for i := 0; i < 10; i++ {
fmt.Printf("%s %s %s\n", req.Set[i].ObjectId, req.Set[i].Predicate, req.Set[i].Subject)
fmt.Printf("%+v\n", req.Set[i])
}
fmt.Println("======================")
}

func (l *loader) getConflictKeys(nq *api.NQuad) []uint64 {
sid, _ := strconv.ParseUint(nq.Subject, 0, 64)
keys := make([]uint64, 0, 1)
keys = append(keys, farm.Fingerprint64(x.DataKey(nq.Predicate, sid)))

if reversePreds[nq.Predicate] {
oi, _ := strconv.ParseUint(nq.ObjectId, 0, 64)
keys = append(keys, farm.Fingerprint64(x.DataKey(nq.Predicate, oi)))
}

return keys
}

func (l *loader) getConflicts(req *api.Mutation) []uint64 {
keys := make([]uint64, 0, len(req.Set))
for _, i := range req.Set {
keys = append(keys, l.getConflictKeys(i)...)
}
return keys
}

func (l *loader) writeMap(req *api.Mutation) bool {
mSlice := l.getConflicts(req)

l.uidsLock.Lock()
defer l.uidsLock.Unlock()

for _, val := range mSlice {
if t, ok := l.currentUIDS[val]; ok && t {
return false
}
}

for _, val := range mSlice {
l.currentUIDS[val] = true
}

return true
}

func (l *loader) removeMap(req *api.Mutation) {
mSlice := l.getConflicts(req)

l.uidsLock.Lock()
defer l.uidsLock.Unlock()

for _, i := range mSlice {
delete(l.currentUIDS, i)
}
}

// makeRequests can receive requests from batchNquads or directly from BatchSetWithMark.
// It doesn't need to batch the requests anymore. Batching is already done for it by the
// caller functions.
func (l *loader) makeRequests() {
defer l.requestsWg.Done()

buffer := make([]api.Mutation, 0, l.opts.bufferSize)
for req := range l.reqs {
if l.writeMap(&req) {
reqNum := atomic.AddUint64(&l.reqNum, 1)
l.request(req, reqNum)
} else {
buffer = append(buffer, req)
}

i := 0
for _, mu := range buffer {
if l.writeMap(&mu) {
reqNum := atomic.AddUint64(&l.reqNum, 1)
l.request(req, reqNum)
continue
}

buffer[i] = mu
i++
}
buffer = buffer[:i]
}

for _, req := range buffer {
for !l.writeMap(&req) {
time.Sleep(5)
}
reqNum := atomic.AddUint64(&l.reqNum, 1)
l.request(req, reqNum)
}
Expand Down
32 changes: 24 additions & 8 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ type options struct {
newUids bool
verbose bool
httpAddr string
bufferSize int
}

var (
opt options
// Live is the sub-command invoked when running "dgraph live".
Live x.SubCommand
Live x.SubCommand
reversePreds map[string]bool
)

func init() {
Expand Down Expand Up @@ -105,6 +107,7 @@ func init() {
flag.Bool("verbose", false, "Run the live loader in verbose mode")
flag.StringP("user", "u", "", "Username if login is required.")
flag.StringP("password", "p", "", "Password of the user.")
flag.StringP("bufferSize", "m", "100", "Buffer for each thread")

// TLS configuration
x.RegisterClientTLSFlags(flag)
Expand Down Expand Up @@ -136,6 +139,16 @@ func processSchemaFile(ctx context.Context, file string, dgraphClient *dgo.Dgrap
x.Checkf(err, "Error while reading file")
}

reversePreds = make(map[string]bool)

for _, schemaLine := range strings.Split(string(b), "\n") {
if !strings.Contains(schemaLine, "@reverse") {
continue
}
predicate := strings.Fields(schemaLine)[0]
reversePreds[predicate] = true
}

op := &api.Operation{}
op.Schema = string(b)
return dgraphClient.Alter(ctx, op)
Expand Down Expand Up @@ -246,13 +259,14 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader {

alloc := xidmap.New(connzero, db)
l := &loader{
opts: opts,
dc: dc,
start: time.Now(),
reqs: make(chan api.Mutation, opts.Pending*2),
alloc: alloc,
db: db,
zeroconn: connzero,
opts: opts,
dc: dc,
start: time.Now(),
reqs: make(chan api.Mutation, opts.Pending*2),
currentUIDS: make(map[uint64]bool),
alloc: alloc,
db: db,
zeroconn: connzero,
}

l.requestsWg.Add(opts.Pending)
Expand All @@ -279,6 +293,7 @@ func run() error {
newUids: Live.Conf.GetBool("new_uids"),
verbose: Live.Conf.GetBool("verbose"),
httpAddr: Live.Conf.GetString("http"),
bufferSize: Live.Conf.GetInt("bufferSize"),
}
go func() {
if err := http.ListenAndServe(opt.httpAddr, nil); err != nil {
Expand All @@ -292,6 +307,7 @@ func run() error {
PrintCounters: true,
Ctx: ctx,
MaxRetries: math.MaxUint32,
bufferSize: opt.bufferSize,
}

dg, closeFunc := x.GetDgraphClient(Live.Conf, true)
Expand Down
20 changes: 20 additions & 0 deletions dgraph/live_loader_2.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[Decoder]: Using assembly version of decoder
I1127 17:45:13.128864 23677 init.go:98]

Dgraph version : v1.1.0-169-gc3c1a485
Dgraph SHA-256 : 8e63f911c267d8765f1c562e6a1ba2284b2f7a2e66244c58e3f351df57c478e5
Commit SHA-1 : c3c1a485
Commit timestamp : 2019-11-25 18:24:30 -0800
Branch : master
Go version : go1.13.4

For Dgraph official documentation, visit https://docs.dgraph.io.
For discussions about Dgraph , visit https://discuss.dgraph.io.
To say hi to the community , visit https://dgraph.slack.com.

Licensed variously under the Apache Public License 2.0 and Dgraph Community License.
Copyright 2015-2018 Dgraph Labs, Inc.



Running transaction with dgraph endpoint: 0.0.0.0:9180
20 changes: 20 additions & 0 deletions dgraph/live_loader_3.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[Decoder]: Using assembly version of decoder
I1129 14:23:19.862218 16253 init.go:98]

Dgraph version : v1.1.0-169-gc3c1a485
Dgraph SHA-256 : b5ae685f591b1ee1905447f847d25295a643514c72b865c9588e7d42bb58e4ad
Commit SHA-1 : c3c1a485
Commit timestamp : 2019-11-25 18:24:30 -0800
Branch : master
Go version : go1.13.4

For Dgraph official documentation, visit https://docs.dgraph.io.
For discussions about Dgraph , visit https://discuss.dgraph.io.
To say hi to the community , visit https://dgraph.slack.com.

Licensed variously under the Apache Public License 2.0 and Dgraph Community License.
Copyright 2015-2018 Dgraph Labs, Inc.



Running transaction with dgraph endpoint: 0.0.0.0:9180
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v2 v2.0.1-0.20191206180002-8b99eb433aa7
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6
github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6 // indirect
Expand Down
1 change: 1 addition & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
default:
// Don't assign a conflictKey.
}

txn.addConflictKey(conflictKey)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion posting/size_test.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

# get the p directory
wget https://storage.googleapis.com/dgraph-datasets/21million/p/p.tar.gz
#wget https://storage.googleapis.com/dgraph-datasets/21million/p/p.tar.gz

#untar it
tar -xvf p.tar.gz
Expand Down