From cdf23e325b9205dfa76fa347250f7711f5c3d1d9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 3 Mar 2018 17:30:09 -0800 Subject: [PATCH] *: allow fully concurrent large read --- etcdserver/v3_server.go | 2 - mvcc/backend/backend.go | 34 ++++++++++++++ mvcc/backend/backend_test.go | 75 ++++++++++++++++++++++++++++++ mvcc/backend/concurrent_read_tx.go | 38 +++++++++++++++ mvcc/index.go | 7 +-- mvcc/kvstore_test.go | 3 +- mvcc/kvstore_txn.go | 51 ++++++++++++++++---- 7 files changed, 195 insertions(+), 15 deletions(-) create mode 100644 mvcc/backend/concurrent_read_tx.go diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 13b9bff0d74..1c3fa764e30 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -86,8 +86,6 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - defer warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), time.Now(), r) - if !r.Serializable { err := s.linearizableReadNotify(ctx) if err != nil { diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 4ba8ec5689b..d08adcc05b8 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -49,6 +49,11 @@ var ( ) type Backend interface { + // CommittedReadTx returns a non-blocking read tx that is suitable for large reads. + // CommittedReadTx call itself will not return until the current BatchTx gets committed to + // ensure consistency. + CommittedReadTx() ReadTx + ReadTx() ReadTx BatchTx() BatchTx @@ -97,6 +102,8 @@ type backend struct { readTx *readTx + concurrentReadTxCh chan chan ReadTx + stopc chan struct{} donec chan struct{} @@ -165,6 +172,8 @@ func newBackend(bcfg BackendConfig) *backend { buckets: make(map[string]*bolt.Bucket), }, + concurrentReadTxCh: make(chan chan ReadTx), + stopc: make(chan struct{}), donec: make(chan struct{}), @@ -184,6 +193,12 @@ func (b *backend) BatchTx() BatchTx { func (b *backend) ReadTx() ReadTx { return b.readTx } +func (b *backend) CommittedReadTx() ReadTx { + rch := make(chan ReadTx) + b.concurrentReadTxCh <- rch + return <-rch +} + // ForceCommit forces the current batching tx to commit. func (b *backend) ForceCommit() { b.batchTx.Commit() @@ -301,6 +316,25 @@ func (b *backend) run() { b.batchTx.Commit() } t.Reset(b.batchInterval) + b.createConcurrentReadTxs() + } +} + +func (b *backend) createConcurrentReadTxs() { + // do not allow too many concurrent read txs. + // TODO: improve this by having a global pending counter? + for i := 0; i < 100; i++ { + select { + case rch := <-b.concurrentReadTxCh: + rtx, err := b.db.Begin(false) + if err != nil { + plog.Fatalf("cannot begin read tx (%s)", err) + } + rch <- &concurrentReadTx{tx: rtx} + default: + // no more to create. + return + } } } diff --git a/mvcc/backend/backend_test.go b/mvcc/backend/backend_test.go index 9bdec5c4872..a38252f2087 100644 --- a/mvcc/backend/backend_test.go +++ b/mvcc/backend/backend_test.go @@ -300,6 +300,81 @@ func TestBackendWritebackForEach(t *testing.T) { } } +// TestBackendConcurrentReadTx checks if the concurrent tx is created correctly. +func TestBackendConcurrentReadTx(t *testing.T) { + b, tmpPath := NewTmpBackend(2*time.Second, 10000) + defer cleanup(b, tmpPath) + + var rtx0 ReadTx + done := make(chan struct{}) + go func() { + rtx0 = b.ConcurrentReadTx() + close(done) + }() + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("key")) + for i := 0; i < 5; i++ { + k := []byte(fmt.Sprintf("%04d", i)) + tx.UnsafePut([]byte("key"), k, []byte("bar")) + } + tx.Unlock() + + select { + case <-done: + t.Fatal("concurrent read tx should block on the last batch tx!") + case <-time.After(time.Second): + } + + select { + case <-done: + case <-time.After(4 * time.Second): + t.Fatal("commit the last batched tx should unblock concurrent tx!") + } + + rtx0.Lock() + defer rtx0.Unlock() + ks, _ := rtx0.UnsafeRange([]byte("key"), []byte(fmt.Sprintf("%04d", 0)), []byte(fmt.Sprintf("%04d", 5)), 0) + if len(ks) != 5 { + t.Errorf("got %d keys, expect %d", len(ks), 5) + } + + // test if we can create concurrent read while the previous read tx is still open + var rtx1 ReadTx + done = make(chan struct{}) + go func() { + rtx1 = b.ConcurrentReadTx() + rtx1.Lock() + rtx1.UnsafeForEach([]byte(""), nil) + rtx1.Unlock() + close(done) + }() + select { + case <-done: + case <-time.After(4 * time.Second): + t.Fatal("cannot create concurrent read") + } + + done = make(chan struct{}) + // test if we can create concurrent write while the previous read tx is still open + go func() { + tx := b.BatchTx() + tx.Lock() + for i := 0; i < 5; i++ { + k := []byte(fmt.Sprintf("%04d", i)) + tx.UnsafePut([]byte("key"), k, []byte("bar")) + } + tx.Unlock() + close(done) + }() + select { + case <-done: + case <-time.After(4 * time.Second): + t.Fatal("cannot create concurrent write") + } +} + func cleanup(b Backend, path string) { b.Close() os.Remove(path) diff --git a/mvcc/backend/concurrent_read_tx.go b/mvcc/backend/concurrent_read_tx.go new file mode 100644 index 00000000000..390a562dfef --- /dev/null +++ b/mvcc/backend/concurrent_read_tx.go @@ -0,0 +1,38 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + bolt "github.com/coreos/bbolt" +) + +type concurrentReadTx struct { + tx *bolt.Tx +} + +func (rt *concurrentReadTx) Lock() {} +func (rt *concurrentReadTx) Unlock() { rt.tx.Rollback() } + +func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + bucket := rt.tx.Bucket(bucketName) + if bucket == nil { + plog.Fatalf("bucket %s does not exist", bucketName) + } + return unsafeRange(bucket.Cursor(), key, endKey, limit) +} + +func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + return unsafeForEach(rt.tx, bucketName, visitor) +} diff --git a/mvcc/index.go b/mvcc/index.go index f8cc6df88cf..e2964cf5bbf 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -91,10 +91,11 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex { func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) { keyi, endi := &keyIndex{key: key}, &keyIndex{key: end} - ti.RLock() - defer ti.RUnlock() + ti.Lock() + clone := ti.tree.Clone() + ti.Unlock() - ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool { + clone.AscendGreaterOrEqual(keyi, func(item btree.Item) bool { if len(endi.key) > 0 && !item.Less(endi) { return false } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f4d0fdfe3c3..81702bf5473 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -640,7 +640,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) - txn := s.Read() + txn := s.Write() done := make(chan struct{}) go func() { @@ -742,6 +742,7 @@ type fakeBackend struct { func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } +func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) SizeInUse() int64 { return 0 } diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 0e9b86135a8..73502f88e32 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -21,9 +21,21 @@ import ( "go.uber.org/zap" ) +const ( + expensiveReadLimit = 1000 + readonly = true + readwrite = false +) + type storeTxnRead struct { - s *store - tx backend.ReadTx + s *store + tx backend.ReadTx + txlocked bool + + // for creating concurrent read tx when the read is expensive. + b backend.Backend + // is the transcation readonly? + ro bool firstRev int64 rev int64 @@ -33,10 +45,15 @@ func (s *store) Read() TxnRead { s.mu.RLock() tx := s.b.ReadTx() s.revMu.RLock() - tx.Lock() firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() - return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) + return newMetricsTxnRead(&storeTxnRead{ + s: s, + tx: tx, + b: s.b, + ro: readonly, + firstRev: firstRev, + rev: rev}) } func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } @@ -47,7 +64,9 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, } func (tr *storeTxnRead) End() { - tr.tx.Unlock() + if tr.txlocked { + tr.tx.Unlock() + } tr.s.mu.RUnlock() } @@ -64,10 +83,15 @@ func (s *store) Write() TxnWrite { tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0}, - tx: tx, - beginRev: s.currentRev, - changes: make([]mvccpb.KeyValue, 0, 4), + storeTxnRead: storeTxnRead{ + s: s, + txlocked: true, + tx: tx, + ro: readwrite, + }, + tx: tx, + beginRev: s.currentRev, + changes: make([]mvccpb.KeyValue, 0, 4), } return newMetricsTxnWrite(tw) } @@ -134,6 +158,15 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions limit = len(revpairs) } + if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation + // too many keys to range. upgrade the read transcation to concurrent read tx. + tr.tx = tr.b.CommittedReadTx() + } + if !tr.txlocked { + tr.tx.Lock() + tr.txlocked = true + } + kvs := make([]mvccpb.KeyValue, limit) revBytes := newRevBytes() for i, revpair := range revpairs[:len(kvs)] {