Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and Optimize data streaming #2442

Merged
merged 10 commits into from
Jun 14, 2018
8 changes: 8 additions & 0 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package conn
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -174,6 +175,13 @@ func (n *Node) Send(m raftpb.Message) {
}
}

func (n *Node) Snapshot() (raftpb.Snapshot, error) {
if n == nil || n.Store == nil {
return raftpb.Snapshot{}, errors.New("Uninitialized node or raft store.")
}
return n.Store.Snapshot()
}

func (n *Node) SaveToStorage(h raftpb.HardState, es []raftpb.Entry, s raftpb.Snapshot) {
x.Check(n.Store.Save(h, es, s))
}
Expand Down
3 changes: 3 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ func unmarshalOrCopy(plist *intern.PostingList, item *badger.Item) error {

// constructs the posting list from the disk using the passed iterator.
// Use forward iterator with allversions enabled in iter options.
//
// key would now be owned by the posting list. So, ensure that it isn't reused
// elsewhere.
func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l := new(List)
l.key = key
Expand Down
17 changes: 15 additions & 2 deletions vendor/github.com/dgraph-io/badger/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion vendor/github.com/dgraph-io/badger/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion vendor/github.com/dgraph-io/badger/iterator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/dgraph-io/badger/transaction.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/github.com/dgraph-io/badger/value.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 13 additions & 25 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,52 +163,40 @@
"revisionTime": "2016-09-07T16:21:46Z"
},
{
"checksumSHA1": "cy6MsL7H3EaR4rkvZq5xUwoZ/g4=",
"checksumSHA1": "Z9ACXnmy9E2oP8htXk2/XUVAjag=",
"path": "github.com/dgraph-io/badger",
"revision": "690400e629977977ecddfa8ab3a7f9285e1041a8",
"revisionTime": "2018-06-04T20:44:01Z",
"version": "v1.5.1",
"versionExact": "v1.5.1"
"revision": "3340933a01945fb517775a0e0f079a701d04f094",
"revisionTime": "2018-06-13T23:21:36Z"
},
{
"checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=",
"path": "github.com/dgraph-io/badger/options",
"revision": "690400e629977977ecddfa8ab3a7f9285e1041a8",
"revisionTime": "2018-06-04T20:44:01Z",
"version": "v1.5.1",
"versionExact": "v1.5.1"
"revision": "3340933a01945fb517775a0e0f079a701d04f094",
"revisionTime": "2018-06-13T23:21:36Z"
},
{
"checksumSHA1": "gGTDnTVVw5kcT2P5NXZV1YSckOU=",
"path": "github.com/dgraph-io/badger/protos",
"revision": "690400e629977977ecddfa8ab3a7f9285e1041a8",
"revisionTime": "2018-06-04T20:44:01Z",
"version": "v1.5.1",
"versionExact": "v1.5.1"
"revision": "3340933a01945fb517775a0e0f079a701d04f094",
"revisionTime": "2018-06-13T23:21:36Z"
},
{
"checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=",
"path": "github.com/dgraph-io/badger/skl",
"revision": "690400e629977977ecddfa8ab3a7f9285e1041a8",
"revisionTime": "2018-06-04T20:44:01Z",
"version": "v1.5.1",
"versionExact": "v1.5.1"
"revision": "3340933a01945fb517775a0e0f079a701d04f094",
"revisionTime": "2018-06-13T23:21:36Z"
},
{
"checksumSHA1": "I33KkP2lnYqJDasvvsAlebzkeko=",
"path": "github.com/dgraph-io/badger/table",
"revision": "690400e629977977ecddfa8ab3a7f9285e1041a8",
"revisionTime": "2018-06-04T20:44:01Z",
"version": "v1.5.1",
"versionExact": "v1.5.1"
"revision": "3340933a01945fb517775a0e0f079a701d04f094",
"revisionTime": "2018-06-13T23:21:36Z"
},
{
"checksumSHA1": "v2pJQ5NbS034cLP+GM1WLlGnByY=",
"path": "github.com/dgraph-io/badger/y",
"revision": "690400e629977977ecddfa8ab3a7f9285e1041a8",
"revisionTime": "2018-06-04T20:44:01Z",
"version": "v1.5.1",
"versionExact": "v1.5.1"
"revision": "3340933a01945fb517775a0e0f079a701d04f094",
"revisionTime": "2018-06-13T23:21:36Z"
},
{
"checksumSHA1": "a29TtOU87eZA0S6wL+rAkpqUEzc=",
Expand Down
167 changes: 59 additions & 108 deletions worker/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ const (
// writeBatch performs a batch write of key value pairs to BadgerDB.
func writeBatch(ctx context.Context, pstore *badger.ManagedDB, kvChan chan *intern.KV, che chan error) {
var bytesWritten uint64
t := time.NewTicker(5 * time.Second)
t := time.NewTicker(time.Second)
defer t.Stop()
go func() {
now := time.Now()
for range t.C {
dur := time.Since(now)
x.Printf("Getting SNAPSHOT: Time elapsed: %v, bytes written: %s, bytes/sec %d\n",
x.FixedDuration(dur), humanize.Bytes(bytesWritten), bytesWritten/uint64(dur.Seconds()))
speed := bytesWritten / uint64(dur.Seconds())
x.Printf("Getting SNAPSHOT: Time elapsed: %v, bytes written: %s, %s/s\n",
x.FixedDuration(dur), humanize.Bytes(bytesWritten), humanize.Bytes(speed))
}
}()

Expand Down Expand Up @@ -176,40 +177,6 @@ func (n *node) populateShard(ps *badger.ManagedDB, pl *conn.Pool) (int, error) {
return count, nil
}

func toKV(it *badger.Iterator, pk *x.ParsedKey) (*intern.KV, error) {
item := it.Item()
var kv *intern.KV

key := make([]byte, len(item.Key()))
// Key would be modified by ReadPostingList as it advances the iterator and changes the item.
copy(key, item.Key())

if pk.IsSchema() {
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
kv = &intern.KV{
Key: key,
Val: val,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
}
it.Next()
return kv, nil
}

l, err := posting.ReadPostingList(key, it)
if err != nil {
return nil, err
}
kv, err = l.MarshalToKv()
if err != nil {
return nil, err
}
return kv, nil
}

func (w *grpcWorker) PredicateAndSchemaData(m *intern.SnapshotMeta, stream intern.Worker_PredicateAndSchemaDataServer) error {
clientTs := m.ClientTs

Expand All @@ -226,7 +193,31 @@ func (w *grpcWorker) PredicateAndSchemaData(m *intern.SnapshotMeta, stream inter
// Any commit which happens in the future will have commitTs greater than
// this.
// TODO: Ensure all deltas have made to disk and read in memory before checking disk.
min_ts := posting.Txns().MinTs()
// BUG: There's a bug here due to which a node which doesn't see any transactions, but has real
// data fails to send that over, because of min_ts.
min_ts := posting.Txns().MinTs() // Why are we not using last snapshot ts?
x.Printf("Got min_ts: %d\n", min_ts)
// snap, err := groups().Node.Snapshot()
// if err != nil {
// return err
// }
// index := snap.Metadata.Index

// TODO: Why are we using MinTs() in the place when we should be using
// snapshot index? This is wrong.

// TODO: We are not using the snapshot time, because we don't store the
// transaction timestamp in the snapshot. If we did, we'd just use that
// instead of this. This causes issues if the server had received a snapshot
// to begin with, but had no active transactions. Then mints is always zero,
// hence nothing is read or sent in the stream.

// UPDATE: This doesn't look too bad. So, we're keeping track of the
// transaction timestamps on the side. And we're using those to figure out
// what to stream here. The snapshot index is not really being used for
// anything here.
// This whole transaction tracking business is complex and must be
// simplified to its essence.

// Send ts as first KV.
if err := stream.Send(&intern.KVS{
Expand All @@ -238,82 +229,42 @@ func (w *grpcWorker) PredicateAndSchemaData(m *intern.SnapshotMeta, stream inter
return err
}

txn := pstore.NewTransactionAt(min_ts, false)
defer txn.Discard()
iterOpts := badger.DefaultIteratorOptions
iterOpts.AllVersions = true
iterOpts.PrefetchValues = false
it := txn.NewIterator(iterOpts)
defer it.Close()

var count int
var batchSize int
var prevKey []byte
kvs := &intern.KVS{}
// Do NOT it.Next() by default. Be careful when you "continue" in loop!
var bytesSent uint64
t := time.NewTicker(5 * time.Second)
defer t.Stop()
go func() {
now := time.Now()
for range t.C {
dur := time.Since(now)
x.Printf("Sending SNAPSHOT: Time elapsed: %v, bytes sent: %s, bytes/sec %d\n",
x.FixedDuration(dur), humanize.Bytes(bytesSent), bytesSent/uint64(dur.Seconds()))
}
}()
for it.Rewind(); it.Valid(); {
iterItem := it.Item()
k := iterItem.Key()
if bytes.Equal(k, prevKey) {
it.Next()
continue
}

if cap(prevKey) < len(k) {
prevKey = make([]byte, len(k))
} else {
prevKey = prevKey[:len(k)]
}
copy(prevKey, k)

pk := x.Parse(prevKey)
// Schema keys always have version 1. So we send it irrespective of the timestamp.
if iterItem.Version() <= clientTs && !pk.IsSchema() {
it.Next()
continue
sl := streamLists{stream: stream}
sl.chooseKey = func(key []byte, version uint64) bool {
pk := x.Parse(key)
return version > clientTs || pk.IsSchema()
}
sl.itemToKv = func(key []byte, itr *badger.Iterator) (*intern.KV, error) {
item := itr.Item()
pk := x.Parse(key)
if pk.IsSchema() {
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
kv := &intern.KV{
Key: key,
Val: val,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
}
return kv, nil
}

// This key is not present in follower.
kv, err := toKV(it, pk)
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return err
}
kvs.Kv = append(kvs.Kv, kv)
batchSize += kv.Size()
bytesSent += uint64(kv.Size())
count++
if count%100000 == 0 {
x.Printf("Sent %d keys\n", count)
}
if batchSize < MB { // 1MB
continue
}
if err := stream.Send(kvs); err != nil {
return err
}
batchSize = 0
kvs = &intern.KVS{}
} // end of iterator
if batchSize > 0 {
if err := stream.Send(kvs); err != nil {
return err
return nil, err
}
return l.MarshalToKv()
}

txn := pstore.NewTransactionAt(min_ts, false)
defer txn.Discard()
if err := sl.orchestrate(stream.Context(), "Sending SNAPSHOT", txn); err != nil {
return err
}
x.Printf("Sent %d keys. Done.\n", count)

if tr, ok := trace.FromContext(stream.Context()); ok {
tr.LazyPrintf("Sent %d keys. Done.\n", count)
tr.LazyPrintf("Sent keys. Done.\n")
}
return nil
}
Loading