Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: allow fully concurrent large read #9384

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
defer warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), time.Now(), r)

if !r.Serializable {
err := s.linearizableReadNotify(ctx)
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ var (
)

type Backend interface {
// CommittedReadTx returns a non-blocking read tx that is suitable for large reads.
// CommittedReadTx call itself will not return until the current BatchTx gets committed to
// ensure consistency.
CommittedReadTx() ReadTx

ReadTx() ReadTx
BatchTx() BatchTx

Expand Down Expand Up @@ -97,6 +102,8 @@ type backend struct {

readTx *readTx

concurrentReadTxCh chan chan ReadTx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to avoid O(n) broadcast here; can do O(1) by returning <-chan struct{} that closes on the next commit, wait on channel close, fetch the latest committed tx, then acquire some semaphore to limit concurrency. Could return a closed channel if the backend is already fully committed, so no need to wait on commit timer.


stopc chan struct{}
donec chan struct{}

Expand Down Expand Up @@ -165,6 +172,8 @@ func newBackend(bcfg BackendConfig) *backend {
buckets: make(map[string]*bolt.Bucket),
},

concurrentReadTxCh: make(chan chan ReadTx),

stopc: make(chan struct{}),
donec: make(chan struct{}),

Expand All @@ -184,6 +193,12 @@ func (b *backend) BatchTx() BatchTx {

func (b *backend) ReadTx() ReadTx { return b.readTx }

func (b *backend) CommittedReadTx() ReadTx {
rch := make(chan ReadTx)
b.concurrentReadTxCh <- rch
return <-rch
}

// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
Expand Down Expand Up @@ -301,6 +316,25 @@ func (b *backend) run() {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
b.createConcurrentReadTxs()
}
}

func (b *backend) createConcurrentReadTxs() {
// do not allow too many concurrent read txs.
// TODO: improve this by having a global pending counter?
for i := 0; i < 100; i++ {
select {
case rch := <-b.concurrentReadTxCh:
rtx, err := b.db.Begin(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazily create tx following each commit?

if err != nil {
plog.Fatalf("cannot begin read tx (%s)", err)
}
rch <- &concurrentReadTx{tx: rtx}
default:
// no more to create.
return
}
}
}

Expand Down
75 changes: 75 additions & 0 deletions mvcc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,81 @@ func TestBackendWritebackForEach(t *testing.T) {
}
}

// TestBackendConcurrentReadTx checks if the concurrent tx is created correctly.
func TestBackendConcurrentReadTx(t *testing.T) {
b, tmpPath := NewTmpBackend(2*time.Second, 10000)
defer cleanup(b, tmpPath)

var rtx0 ReadTx
done := make(chan struct{})
go func() {
rtx0 = b.ConcurrentReadTx()
close(done)
}()

tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
}
tx.Unlock()

select {
case <-done:
t.Fatal("concurrent read tx should block on the last batch tx!")
case <-time.After(time.Second):
}

select {
case <-done:
case <-time.After(4 * time.Second):
t.Fatal("commit the last batched tx should unblock concurrent tx!")
}

rtx0.Lock()
defer rtx0.Unlock()
ks, _ := rtx0.UnsafeRange([]byte("key"), []byte(fmt.Sprintf("%04d", 0)), []byte(fmt.Sprintf("%04d", 5)), 0)
if len(ks) != 5 {
t.Errorf("got %d keys, expect %d", len(ks), 5)
}

// test if we can create concurrent read while the previous read tx is still open
var rtx1 ReadTx
done = make(chan struct{})
go func() {
rtx1 = b.ConcurrentReadTx()
rtx1.Lock()
rtx1.UnsafeForEach([]byte(""), nil)
rtx1.Unlock()
close(done)
}()
select {
case <-done:
case <-time.After(4 * time.Second):
t.Fatal("cannot create concurrent read")
}

done = make(chan struct{})
// test if we can create concurrent write while the previous read tx is still open
go func() {
tx := b.BatchTx()
tx.Lock()
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
}
tx.Unlock()
close(done)
}()
select {
case <-done:
case <-time.After(4 * time.Second):
t.Fatal("cannot create concurrent write")
}
}

func cleanup(b Backend, path string) {
b.Close()
os.Remove(path)
Expand Down
38 changes: 38 additions & 0 deletions mvcc/backend/concurrent_read_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

import (
bolt "github.com/coreos/bbolt"
)

type concurrentReadTx struct {
tx *bolt.Tx
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() { rt.tx.Rollback() }

func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := rt.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
}
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return unsafeForEach(rt.tx, bucketName, visitor)
}
7 changes: 4 additions & 3 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}

ti.RLock()
defer ti.RUnlock()
ti.Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jingyih i think this improvement is still needed? can you make a separate PR for this change?

Copy link
Contributor

@jingyih jingyih Mar 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

clone := ti.tree.Clone()
ti.Unlock()

ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
clone.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
Expand Down
3 changes: 2 additions & 1 deletion mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

txn := s.Read()
txn := s.Write()

done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -742,6 +742,7 @@ type fakeBackend struct {

func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
Expand Down
51 changes: 42 additions & 9 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,21 @@ import (
"go.uber.org/zap"
)

const (
expensiveReadLimit = 1000
readonly = true
readwrite = false
)

type storeTxnRead struct {
s *store
tx backend.ReadTx
s *store
tx backend.ReadTx
txlocked bool

// for creating concurrent read tx when the read is expensive.
b backend.Backend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there should be something like storeTxnReadLimited that wraps storeTxnRead

// is the transcation readonly?
ro bool

firstRev int64
rev int64
Expand All @@ -33,10 +45,15 @@ func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
return newMetricsTxnRead(&storeTxnRead{
s: s,
tx: tx,
b: s.b,
ro: readonly,
firstRev: firstRev,
rev: rev})
}

func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
Expand All @@ -47,7 +64,9 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
}

func (tr *storeTxnRead) End() {
tr.tx.Unlock()
if tr.txlocked {
tr.tx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tr.txlocked = false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be better encapsulated? Could tr.tx.Unlock() and tr.txlocked = false be coordinated by a function that tr provides so we have to remember both correctly everywhere? Same for below tr.tx.Lock() logic.

}
tr.s.mu.RUnlock()
}

Expand All @@ -64,10 +83,15 @@ func (s *store) Write() TxnWrite {
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
storeTxnRead: storeTxnRead{
s: s,
txlocked: true,
tx: tx,
ro: readwrite,
},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
Expand Down Expand Up @@ -134,6 +158,15 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
limit = len(revpairs)
}

if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., storeTxnReadLimited could impose a limit+1 on kvindex.Revisions(). If the result is >limit, load by chunk from the committed tx-- it would split up the rlock hold times and spend less memory on revpairs

// too many keys to range. upgrade the read transcation to concurrent read tx.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative to upgrading the tr.tx, would it make sense to have both a tr.tx and a tr.committedTx? This would limit what is put in the committed read transaction to only expensive reads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, once we perform a CommittedReadTx there is no point having two transactions.

tr.tx = tr.b.CommittedReadTx()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When tr.tx is reassigned, does this leak a bolt.Tx? If it is rolledback, I was unable to figure out where.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traced through the code. The original tx is manged by the backend and is rolled back as usual when it finishes the current batch iteration. This looks like it works correctly.

}
if !tr.txlocked {
tr.tx.Lock()
tr.txlocked = true
}

kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
Expand Down