Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no cache on iterator, remove unused snapshot #389

Merged
merged 1 commit into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func newPartitionTable(topic string,
builder storage.Builder,
log logger,
backoff Backoff,
backoffResetTimeout time.Duration) *PartitionTable {

backoffResetTimeout time.Duration,
) *PartitionTable {
pt := &PartitionTable{
partition: partition,
state: newPartitionTableState(),
Expand All @@ -107,7 +107,6 @@ func newPartitionTable(topic string,

// SetupAndRecover sets up the partition storage and recovers to HWM
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error {

err := p.setup(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -405,7 +404,6 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
}

func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {

timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
defer cancel()

Expand Down Expand Up @@ -448,7 +446,6 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {
}

func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error {

stallTicker := time.NewTicker(p.stallPeriod)
defer stallTicker.Stop()

Expand Down Expand Up @@ -527,7 +524,6 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func())
// recover/catchup mechanism so clients can always request stats even if the partition table is not
// running (like a processor table after it's recovered).
func (p *PartitionTable) RunStatsLoop(ctx context.Context) {

updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()
for {
Expand Down
3 changes: 0 additions & 3 deletions storage/iterator.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package storage

import (
"github.com/syndtr/goleveldb/leveldb"
ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
)

// iterator wraps an Iterator implementation and handles the value decoding and
// offset key skipping.
type iterator struct {
iter ldbiter.Iterator
snap *leveldb.Snapshot
}

func (i *iterator) Next() bool {
Expand Down Expand Up @@ -40,7 +38,6 @@ func (i *iterator) Value() ([]byte, error) {

func (i *iterator) Release() {
i.iter.Release()
i.snap.Release()
}

func (i *iterator) Seek(key []byte) bool {
Expand Down
23 changes: 5 additions & 18 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

Expand Down Expand Up @@ -44,7 +45,6 @@ type Iterator interface {
// Implementations of this interface must be safe for any number of concurrent
// readers with one writer.
type Storage interface {

// Opens/Initialize the storage
Open() error

Expand Down Expand Up @@ -95,7 +95,6 @@ type storage struct {

// New creates a new Storage backed by LevelDB.
func New(db *leveldb.DB) (Storage, error) {

return &storage{
db: db,
recovered: make(chan struct{}),
Expand All @@ -106,35 +105,23 @@ func New(db *leveldb.DB) (Storage, error) {

// Iterator returns an iterator that traverses over a snapshot of the storage.
func (s *storage) Iterator() (Iterator, error) {
snap, err := s.db.GetSnapshot()
if err != nil {
return nil, err
}

return &iterator{
iter: s.db.NewIterator(nil, nil),
snap: snap,
iter: s.db.NewIterator(nil, &opt.ReadOptions{
DontFillCache: true,
}),
}, nil
}

// Iterator returns an iterator that traverses over a snapshot of the storage.
func (s *storage) IteratorWithRange(start, limit []byte) (Iterator, error) {
snap, err := s.db.GetSnapshot()
if err != nil {
return nil, err
}

if limit != nil && len(limit) > 0 {
if len(limit) > 0 {
return &iterator{
iter: s.db.NewIterator(&util.Range{Start: start, Limit: limit}, nil),
snap: snap,
}, nil
}
return &iterator{
iter: s.db.NewIterator(util.BytesPrefix(start), nil),
snap: snap,
}, nil

}

func (s *storage) Has(key string) (bool, error) {
Expand Down