From d524b4b0e9c7d05ea0b5d323ee1648cfa5e0f88f Mon Sep 17 00:00:00 2001 From: frairon Date: Fri, 1 Jul 2022 08:49:21 +0200 Subject: [PATCH] no cache on iterator, remove unused snapshot --- partition_table.go | 8 ++------ storage/iterator.go | 3 --- storage/storage.go | 23 +++++------------------ 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/partition_table.go b/partition_table.go index cf1a0b5f..57e6ea92 100644 --- a/partition_table.go +++ b/partition_table.go @@ -79,8 +79,8 @@ func newPartitionTable(topic string, builder storage.Builder, log logger, backoff Backoff, - backoffResetTimeout time.Duration) *PartitionTable { - + backoffResetTimeout time.Duration, +) *PartitionTable { pt := &PartitionTable{ partition: partition, state: newPartitionTableState(), @@ -107,7 +107,6 @@ func newPartitionTable(topic string, // SetupAndRecover sets up the partition storage and recovers to HWM func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error { - err := p.setup(ctx) if err != nil { return err @@ -405,7 +404,6 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error { } func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error { - timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout) defer cancel() @@ -448,7 +446,6 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error { } func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error { - stallTicker := time.NewTicker(p.stallPeriod) defer stallTicker.Stop() @@ -527,7 +524,6 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func()) // recover/catchup mechanism so clients can always request stats even if the partition table is not // running (like a processor table after it's recovered). func (p *PartitionTable) RunStatsLoop(ctx context.Context) { - updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval) defer updateHwmStatsTicker.Stop() for { diff --git a/storage/iterator.go b/storage/iterator.go index 671a1097..dba8c45d 100644 --- a/storage/iterator.go +++ b/storage/iterator.go @@ -1,7 +1,6 @@ package storage import ( - "github.com/syndtr/goleveldb/leveldb" ldbiter "github.com/syndtr/goleveldb/leveldb/iterator" ) @@ -9,7 +8,6 @@ import ( // offset key skipping. type iterator struct { iter ldbiter.Iterator - snap *leveldb.Snapshot } func (i *iterator) Next() bool { @@ -40,7 +38,6 @@ func (i *iterator) Value() ([]byte, error) { func (i *iterator) Release() { i.iter.Release() - i.snap.Release() } func (i *iterator) Seek(key []byte) bool { diff --git a/storage/storage.go b/storage/storage.go index dbd7c60d..0816a1b1 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -8,6 +8,7 @@ import ( "time" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -44,7 +45,6 @@ type Iterator interface { // Implementations of this interface must be safe for any number of concurrent // readers with one writer. type Storage interface { - // Opens/Initialize the storage Open() error @@ -95,7 +95,6 @@ type storage struct { // New creates a new Storage backed by LevelDB. func New(db *leveldb.DB) (Storage, error) { - return &storage{ db: db, recovered: make(chan struct{}), @@ -106,35 +105,23 @@ func New(db *leveldb.DB) (Storage, error) { // Iterator returns an iterator that traverses over a snapshot of the storage. func (s *storage) Iterator() (Iterator, error) { - snap, err := s.db.GetSnapshot() - if err != nil { - return nil, err - } - return &iterator{ - iter: s.db.NewIterator(nil, nil), - snap: snap, + iter: s.db.NewIterator(nil, &opt.ReadOptions{ + DontFillCache: true, + }), }, nil } // Iterator returns an iterator that traverses over a snapshot of the storage. func (s *storage) IteratorWithRange(start, limit []byte) (Iterator, error) { - snap, err := s.db.GetSnapshot() - if err != nil { - return nil, err - } - - if limit != nil && len(limit) > 0 { + if len(limit) > 0 { return &iterator{ iter: s.db.NewIterator(&util.Range{Start: start, Limit: limit}, nil), - snap: snap, }, nil } return &iterator{ iter: s.db.NewIterator(util.BytesPrefix(start), nil), - snap: snap, }, nil - } func (s *storage) Has(key string) (bool, error) {