Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/engine: fix pebbleBatch.iter reuse #41896

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/storage/engine/mvcc_logical_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ package engine
import (
"context"
"math"
"reflect"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/kr/pretty"
)

func TestMVCCOpLogWriter(t *testing.T) {
Expand Down Expand Up @@ -164,8 +165,8 @@ func TestMVCCOpLogWriter(t *testing.T) {
TxnID: txn2.ID,
}),
}
if ops := ol.LogicalOps(); !reflect.DeepEqual(exp, ops) {
t.Errorf("expected logical ops %+v, found %+v", exp, ops)
if diff := pretty.Diff(exp, ol.LogicalOps()); diff != nil {
t.Errorf("unexpected logical op differences:\n%s", strings.Join(diff, "\n"))
}
})
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,10 @@ func (p *Pebble) GetSSTables() (sstables SSTableInfos) {
}

type pebbleReadOnly struct {
parent *Pebble
iter pebbleIterator
closed bool
parent *Pebble
prefixIter pebbleIterator
normalIter pebbleIterator
closed bool
}

var _ ReadWriter = &pebbleReadOnly{}
Expand All @@ -540,7 +541,8 @@ func (p *pebbleReadOnly) Close() {
panic("closing an already-closed pebbleReadOnly")
}
p.closed = true
p.iter.destroy()
p.prefixIter.destroy()
p.normalIter.destroy()
}

func (p *pebbleReadOnly) Closed() bool {
Expand Down Expand Up @@ -580,18 +582,23 @@ func (p *pebbleReadOnly) NewIterator(opts IterOptions) Iterator {
return newPebbleIterator(p.parent.db, opts)
}

if p.iter.inuse {
iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
}
if iter.inuse {
panic("iterator already in use")
}
p.iter.inuse = true
p.iter.reusable = true

if p.iter.iter != nil {
p.iter.setOptions(opts)
if iter.iter != nil {
iter.setOptions(opts)
} else {
p.iter.init(p.parent.db, opts)
iter.init(p.parent.db, opts)
iter.reusable = true
}
return &p.iter

iter.inuse = true
return iter
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
Expand Down
52 changes: 33 additions & 19 deletions pkg/storage/engine/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type pebbleBatch struct {
db *pebble.DB
batch *pebble.Batch
buf []byte
iter pebbleIterator
prefixIter pebbleIterator
normalIter pebbleIterator
closed bool
isDistinct bool
distinctOpen bool
Expand All @@ -45,9 +46,15 @@ func newPebbleBatch(db *pebble.DB, batch *pebble.Batch) *pebbleBatch {
db: db,
batch: batch,
buf: pb.buf,
iter: pebbleIterator{
lowerBoundBuf: pb.iter.lowerBoundBuf,
upperBoundBuf: pb.iter.upperBoundBuf,
prefixIter: pebbleIterator{
lowerBoundBuf: pb.prefixIter.lowerBoundBuf,
upperBoundBuf: pb.prefixIter.upperBoundBuf,
reusable: true,
},
normalIter: pebbleIterator{
lowerBoundBuf: pb.normalIter.lowerBoundBuf,
upperBoundBuf: pb.normalIter.upperBoundBuf,
reusable: true,
},
}
return pb
Expand All @@ -59,14 +66,19 @@ func (p *pebbleBatch) Close() {
panic("closing an already-closed pebbleBatch")
}
p.closed = true

// Destroy the iterators before closing the batch.
p.prefixIter.destroy()
p.normalIter.destroy()

if !p.isDistinct {
_ = p.batch.Close()
p.batch = nil
} else {
p.parentBatch.distinctOpen = false
p.isDistinct = false
}
p.iter.destroy()

pebbleBatchPool.Put(p)
}

Expand Down Expand Up @@ -157,20 +169,24 @@ func (p *pebbleBatch) NewIterator(opts IterOptions) Iterator {
return newPebbleIterator(p.batch, opts)
}

if p.iter.inuse {
iter := &p.normalIter
if opts.Prefix {
iter = &p.prefixIter
}
if iter.inuse {
panic("iterator already in use")
}
p.iter.inuse = true
p.iter.reusable = true

if p.iter.iter != nil {
p.iter.setOptions(opts)
if iter.iter != nil {
iter.setOptions(opts)
} else if p.batch.Indexed() {
p.iter.init(p.batch, opts)
iter.init(p.batch, opts)
} else {
p.iter.init(p.db, opts)
iter.init(p.db, opts)
}
return &p.iter

iter.inuse = true
return iter
}

// NewIterator implements the Batch interface.
Expand Down Expand Up @@ -322,12 +338,10 @@ func (p *pebbleBatch) Distinct() ReadWriter {
// optimization. In Pebble we're still using the same underlying batch and if
// it is indexed we'll still be indexing it as we Go.
p.distinctOpen = true
return &pebbleBatch{
db: p.db,
batch: p.batch,
parentBatch: p,
isDistinct: true,
}
d := newPebbleBatch(p.db, p.batch)
d.parentBatch = p
d.isDistinct = true
return d
}

// Empty implements the Batch interface.
Expand Down
15 changes: 11 additions & 4 deletions pkg/storage/engine/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) {
lowerBoundBuf: p.lowerBoundBuf,
upperBoundBuf: p.upperBoundBuf,
prefix: opts.Prefix,
reusable: p.reusable,
}

if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 {
Expand Down Expand Up @@ -125,6 +126,8 @@ func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) {
if p.iter == nil {
panic("unable to create iterator")
}

p.inuse = true
}

func (p *pebbleIterator) setOptions(opts IterOptions) {
Expand Down Expand Up @@ -156,11 +159,12 @@ func (p *pebbleIterator) setOptions(opts IterOptions) {

// Close implements the Iterator interface.
func (p *pebbleIterator) Close() {
if !p.inuse {
panic("closing idle iterator")
}
p.inuse = false

if p.reusable {
if !p.inuse {
panic("closing idle iterator")
}
p.inuse = false
return
}

Expand Down Expand Up @@ -533,6 +537,9 @@ func (p *pebbleIterator) Stats() IteratorStats {
}

func (p *pebbleIterator) destroy() {
if p.inuse {
panic("iterator still in use")
}
if p.iter != nil {
err := p.iter.Close()
if err != nil {
Expand Down