Skip to content

Commit

Permalink
mvcc/backend: rename ReadTx Lock() to RLock()
Browse files Browse the repository at this point in the history
For better code readability, renaming Lock() to RLock() in ReadTx
interface.
  • Loading branch information
jingyih committed Mar 5, 2019
1 parent 918f041 commit 1c19f12
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 21 deletions.
4 changes: 2 additions & 2 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ func (b *backend) defrag() error {
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.mu.Lock()
defer b.readTx.mu.Unlock()
b.readTx.Lock()
defer b.readTx.Unlock()

b.batchTx.unsafeCommit(true)

Expand Down
38 changes: 27 additions & 11 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ type batchTx struct {
pending int
}

func (t *batchTx) Lock() {
t.Mutex.Lock()
}

func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
t.Mutex.Unlock()
}

// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
// have appropriate semantics in BatchTx interface. Therefore should not be called.
// TODO: might want to decouple ReadTx and BatchTx

func (t *batchTx) RLock() {
panic("unexpected RLock")
}

func (t *batchTx) RUnlock() {
panic("unexpected RUnlock")
}

func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
Expand Down Expand Up @@ -194,13 +217,6 @@ func (t *batchTx) CommitAndStop() {
t.Unlock()
}

func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
t.Mutex.Unlock()
}

func (t *batchTx) safePending() int {
t.Mutex.Lock()
defer t.Mutex.Unlock()
Expand Down Expand Up @@ -259,9 +275,9 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {

func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.mu.Lock()
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.mu.Unlock()
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
Expand All @@ -283,9 +299,9 @@ func (t *batchTxBuffered) CommitAndStop() {

func (t *batchTxBuffered) commit(stop bool) {
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.mu.Lock()
t.backend.readTx.Lock()
t.unsafeCommit(stop)
t.backend.readTx.mu.Unlock()
t.backend.readTx.Unlock()
}

func (t *batchTxBuffered) unsafeCommit(stop bool) {
Expand Down
8 changes: 6 additions & 2 deletions mvcc/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var safeRangeBucket = []byte("key")
type ReadTx interface {
Lock()
Unlock()
RLock()
RUnlock()

UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
Expand All @@ -46,8 +48,10 @@ type readTx struct {
buckets map[string]*bolt.Bucket
}

func (rt *readTx) Lock() { rt.mu.RLock() }
func (rt *readTx) Unlock() { rt.mu.RUnlock() }
func (rt *readTx) Lock() { rt.mu.Lock() }
func (rt *readTx) Unlock() { rt.mu.Unlock() }
func (rt *readTx) RLock() { rt.mu.RLock() }
func (rt *readTx) RUnlock() { rt.mu.RUnlock() }

func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
keep := s.kvindex.Keep(rev)

tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
tx.RLock()
defer tx.RUnlock()
s.mu.RUnlock()

upper := revision{main: rev + 1}
Expand Down
2 changes: 2 additions & 0 deletions mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,8 @@ type fakeBatchTx struct {

func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
func (b *fakeBatchTx) RUnlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
Expand Down
8 changes: 6 additions & 2 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
// tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations:
// A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite).
// B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb
// and txReadBuffer needs to be reset.
tx.RLock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
Expand All @@ -47,7 +51,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
}

func (tr *storeTxnRead) End() {
tr.tx.Unlock()
tr.tx.RUnlock()
tr.s.mu.RUnlock()
}

Expand Down
4 changes: 2 additions & 2 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (s *watchableStore) syncWatchers() int {
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.Lock()
tx.RLock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
var evs []mvccpb.Event
if s.store != nil && s.store.lg != nil {
Expand All @@ -355,7 +355,7 @@ func (s *watchableStore) syncWatchers() int {
// TODO: remove this in v3.5
evs = kvsToEvents(nil, wg, revs, vs)
}
tx.Unlock()
tx.RUnlock()

var victims watcherBatch
wb := newWatcherBatch(wg, evs)
Expand Down

0 comments on commit 1c19f12

Please sign in to comment.