Skip to content

Commit

Permalink
Merge pull request #7756 from fpetkovski/generalize-pool
Browse files Browse the repository at this point in the history
Generalize the bucketed bytes pool
  • Loading branch information
fpetkovski authored Sep 18, 2024
2 parents 9dd7905 + 95c9bcf commit 2bdb909
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 48 deletions.
67 changes: 34 additions & 33 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,52 @@ import (
"github.com/pkg/errors"
)

// Bytes is a pool of bytes that can be reused.
type Bytes interface {
// Get returns a new byte slices that fits the given size.
Get(sz int) (*[]byte, error)
// Put returns a byte slice to the right bucket in the pool.
Put(b *[]byte)
// Pool is a pool for slices of type T that can be reused.
type Pool[T any] interface {
// Get returns a new T slice that fits the given size.
Get(sz int) (*[]T, error)
// Put returns a T slice to the right bucket in the pool.
Put(b *[]T)
}

// NoopBytes is pool that always allocated required slice on heap and ignore puts.
type NoopBytes struct{}
// NoopPool is pool that always allocated required slice on heap and ignore puts.
type NoopPool[T any] struct{}

func (p NoopBytes) Get(sz int) (*[]byte, error) {
b := make([]byte, 0, sz)
func (p NoopPool[T]) Get(sz int) (*[]T, error) {
b := make([]T, 0, sz)
return &b, nil
}

func (p NoopBytes) Put(*[]byte) {}
func (p NoopPool[T]) Put(*[]T) {}

// BucketedBytes is a bucketed pool for variably sized byte slices. It can be configured to not allow
// more than a maximum number of bytes being used at a given time.
// Every byte slice obtained from the pool must be returned.
type BucketedBytes struct {
// BucketedPool is a bucketed pool for variably sized T slices. It can be
// configured to not allow more than a maximum number of T items being used at a
// given time. Every slice obtained from the pool must be returned.
type BucketedPool[T any] struct {
buckets []sync.Pool
sizes []int
maxTotal uint64
usedTotal uint64
mtx sync.RWMutex

new func(s int) *[]byte
new func(s int) *[]T
}

// MustNewBucketedBytes is like NewBucketedBytes but panics if construction fails.
// MustNewBucketedPool is like NewBucketedPool but panics if construction fails.
// Useful for package internal pools.
func MustNewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) *BucketedBytes {
pool, err := NewBucketedBytes(minSize, maxSize, factor, maxTotal)
func MustNewBucketedPool[T any](minSize, maxSize int, factor float64, maxTotal uint64) *BucketedPool[T] {
pool, err := NewBucketedPool[T](minSize, maxSize, factor, maxTotal)
if err != nil {
panic(err)
}
return pool
}

// NewBucketedBytes returns a new Bytes with size buckets for minSize to maxSize
// increasing by the given factor and maximum number of used bytes.
// No more than maxTotal bytes can be used at any given time unless maxTotal is set to 0.
func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytes, error) {
// NewBucketedPool returns a new BucketedPool with size buckets for minSize to
// maxSize increasing by the given factor and maximum number of used items. No
// more than maxTotal items can be used at any given time unless maxTotal is set
// to 0.
func NewBucketedPool[T any](minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedPool[T], error) {
if minSize < 1 {
return nil, errors.New("invalid minimum pool size")
}
Expand All @@ -69,23 +70,23 @@ func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*B
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BucketedBytes{
p := &BucketedPool[T]{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
maxTotal: maxTotal,
new: func(sz int) *[]byte {
s := make([]byte, 0, sz)
new: func(sz int) *[]T {
s := make([]T, 0, sz)
return &s
},
}
return p, nil
}

// ErrPoolExhausted is returned if a pool cannot provide the request bytes.
// ErrPoolExhausted is returned if a pool cannot provide the requested slice.
var ErrPoolExhausted = errors.New("pool exhausted")

// Get returns a new byte slice that fits the given size.
func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
// Get returns a slice into from the bucket that fits the given size.
func (p *BucketedPool[T]) Get(sz int) (*[]T, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand All @@ -97,7 +98,7 @@ func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
if sz > bktSize {
continue
}
b, ok := p.buckets[i].Get().(*[]byte)
b, ok := p.buckets[i].Get().(*[]T)
if !ok {
b = p.new(bktSize)
}
Expand All @@ -111,8 +112,8 @@ func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
return p.new(sz), nil
}

// Put returns a byte slice to the right bucket in the pool.
func (p *BucketedBytes) Put(b *[]byte) {
// Put returns a slice to the right bucket in the pool.
func (p *BucketedPool[T]) Put(b *[]T) {
if b == nil {
return
}
Expand All @@ -138,7 +139,7 @@ func (p *BucketedBytes) Put(b *[]byte) {
}
}

func (p *BucketedBytes) UsedBytes() uint64 {
func (p *BucketedPool[T]) UsedBytes() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestMain(m *testing.M) {
}

func TestBytesPool(t *testing.T) {
chunkPool, err := NewBucketedBytes(10, 100, 2, 1000)
chunkPool, err := NewBucketedPool[byte](10, 100, 2, 1000)
testutil.Ok(t, err)

testutil.Equals(t, []int{10, 20, 40, 80}, chunkPool.sizes)
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestBytesPool(t *testing.T) {
}

func TestRacePutGet(t *testing.T) {
chunkPool, err := NewBucketedBytes(3, 100, 2, 5000)
chunkPool, err := NewBucketedPool[byte](3, 100, 2, 5000)
testutil.Ok(t, err)

s := sync.WaitGroup{}
Expand Down
15 changes: 8 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -382,7 +383,7 @@ type BucketStore struct {
indexCache storecache.IndexCache
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Bytes
chunkPool pool.Pool[byte]
seriesBatchSize int

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -504,7 +505,7 @@ func WithQueryGate(queryGate gate.Gate) BucketStoreOption {
}

// WithChunkPool sets a pool.Bytes to use for chunks.
func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption {
func WithChunkPool(chunkPool pool.Pool[byte]) BucketStoreOption {
return func(s *BucketStore) {
s.chunkPool = chunkPool
}
Expand Down Expand Up @@ -600,7 +601,7 @@ func NewBucketStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
chunkPool: pool.NoopBytes{},
chunkPool: pool.NoopPool[byte]{},
blocks: map[ulid.ULID]*bucketBlock{},
blockSets: map[uint64]*bucketBlockSet{},
blockSyncConcurrency: blockSyncConcurrency,
Expand Down Expand Up @@ -2321,7 +2322,7 @@ type bucketBlock struct {
meta *metadata.Meta
dir string
indexCache storecache.IndexCache
chunkPool pool.Bytes
chunkPool pool.Pool[byte]
extLset labels.Labels

indexHeaderReader indexheader.Reader
Expand All @@ -2347,7 +2348,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.Bytes,
chunkPool pool.Pool[byte],
indexHeadReader indexheader.Reader,
p Partitioner,
maxSeriesSizeFunc BlockEstimator,
Expand Down Expand Up @@ -3874,6 +3875,6 @@ func (s *queryStats) toHints() *hintspb.QueryStats {
}

// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings.
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) {
return pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, maxChunkPoolBytes)
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Pool[byte], error) {
return pool.NewBucketedPool[byte](chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, maxChunkPoolBytes)
}
9 changes: 5 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -1492,7 +1493,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk,
f, err := block.NewRawMetaFetcher(logger, ibkt, baseBlockIDsFetcher)
testutil.Ok(t, err)

chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
chunkPool, err := pool.NewBucketedPool[byte](chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
testutil.Ok(t, err)

st, err := NewBucketStore(
Expand Down Expand Up @@ -1599,7 +1600,7 @@ func (m fakePool) Get(sz int) (*[]byte, error) {
func (m fakePool) Put(_ *[]byte) {}

type mockedPool struct {
parent pool.Bytes
parent pool.Pool[byte]
balance atomic.Uint64
gets atomic.Uint64
}
Expand Down Expand Up @@ -1634,7 +1635,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
Source: metadata.TestSource,
}

chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 100e7)
chunkPool, err := pool.NewBucketedPool[byte](chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 100e7)
testutil.Ok(t, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{
Expand Down Expand Up @@ -2714,7 +2715,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) {
testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

// Create a chunk pool with buckets between 8B and 32KB.
chunkPool, err := pool.NewBucketedBytes(8, 32*1024, 2, 1e10)
chunkPool, err := pool.NewBucketedPool[byte](8, 32*1024, 2, 1e10)
testutil.Ok(b, err)

// Create a bucket block with only the dependencies we need for the benchmark.
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"

extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy"
"github.com/thanos-io/thanos/pkg/pool"
)
Expand Down Expand Up @@ -192,7 +193,7 @@ func maximumDecodedLenSnappyStreamed(in []byte) (int, error) {
return maxDecodedLen, nil
}

var decodedBufPool = pool.MustNewBucketedBytes(1024, 65536, 2, 0)
var decodedBufPool = pool.MustNewBucketedPool[byte](1024, 65536, 2, 0)

func newStreamedDiffVarintPostings(input []byte, disablePooling bool) (closeablePostings, error) {
// We can't use the regular s2.Reader because it assumes a stream.
Expand Down Expand Up @@ -449,7 +450,7 @@ func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) {
}

// Creating 15 buckets from 1k to 32mb.
var snappyDecodePool = pool.MustNewBucketedBytes(1024, 32*1024*1024, 2, 0)
var snappyDecodePool = pool.MustNewBucketedPool[byte](1024, 32*1024*1024, 2, 0)

type closeablePostings interface {
index.Postings
Expand Down

0 comments on commit 2bdb909

Please sign in to comment.