Skip to content

Commit

Permalink
[pool] Add support for dynamic, sync.Pool backed, object pools (#3334)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Mar 8, 2021
1 parent 689c990 commit a03e55f
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 47 deletions.
31 changes: 20 additions & 11 deletions src/cmd/services/m3dbnode/config/pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

package config

import "fmt"
import (
"fmt"

"github.com/m3db/m3/src/x/pool"
)

// PoolingType is a type of pooling, using runtime or mmap'd bytes pooling.
type PoolingType string
Expand All @@ -39,7 +43,7 @@ const (
)

type poolPolicyDefault struct {
size int
size pool.Size
refillLowWaterMark float64
refillHighWaterMark float64

Expand Down Expand Up @@ -214,55 +218,55 @@ var (
{
Capacity: intPtr(16),
PoolPolicy: PoolPolicy{
Size: intPtr(524288),
Size: poolSizePtr(524288),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
},
{
Capacity: intPtr(32),
PoolPolicy: PoolPolicy{
Size: intPtr(262144),
Size: poolSizePtr(262144),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
},
{
Capacity: intPtr(64),
PoolPolicy: PoolPolicy{
Size: intPtr(131072),
Size: poolSizePtr(131072),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
},
{
Capacity: intPtr(128),
PoolPolicy: PoolPolicy{
Size: intPtr(65536),
Size: poolSizePtr(65536),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
},
{
Capacity: intPtr(256),
PoolPolicy: PoolPolicy{
Size: intPtr(65536),
Size: poolSizePtr(65536),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
},
{
Capacity: intPtr(1440),
PoolPolicy: PoolPolicy{
Size: intPtr(16384),
Size: poolSizePtr(16384),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
},
{
Capacity: intPtr(4096),
PoolPolicy: PoolPolicy{
Size: intPtr(8192),
Size: poolSizePtr(8192),
RefillLowWaterMark: &defaultRefillLowWaterMark,
RefillHighWaterMark: &defaultRefillHighWaterMark,
},
Expand Down Expand Up @@ -486,7 +490,7 @@ func (p *PoolingPolicy) TypeOrDefault() PoolingType {
// PoolPolicy specifies a single pool policy.
type PoolPolicy struct {
// The size of the pool.
Size *int `yaml:"size"`
Size *pool.Size `yaml:"size"`

// The low watermark to start refilling the pool, if zero none.
RefillLowWaterMark *float64 `yaml:"lowWatermark"`
Expand Down Expand Up @@ -525,7 +529,7 @@ func (p *PoolPolicy) initDefaultsAndValidate(poolName string) error {
}

// SizeOrDefault returns the configured size if present, or a default value otherwise.
func (p *PoolPolicy) SizeOrDefault() int {
func (p *PoolPolicy) SizeOrDefault() pool.Size {
return *p.Size
}

Expand Down Expand Up @@ -668,3 +672,8 @@ type WriteBatchPoolPolicy struct {
func intPtr(x int) *int {
return &x
}

func poolSizePtr(x int) *pool.Size {
sz := pool.Size(x)
return &sz
}
2 changes: 1 addition & 1 deletion src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue,
s.pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool([]pool.Bucket{
{
Capacity: replicas,
Count: s.opts.SeriesIteratorPoolSize(),
Count: pool.Size(s.opts.SeriesIteratorPoolSize()),
},
})
s.pools.multiReaderIteratorArray.Init()
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/multi_reader_iterator_array_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *multiReaderIteratorArrayPool) Init() {
for i := range p.sizesAsc {
buckets[i].capacity = p.sizesAsc[i].Capacity
buckets[i].values = make(chan []MultiReaderIterator, p.sizesAsc[i].Count)
for j := 0; j < p.sizesAsc[i].Count; j++ {
for j := 0; pool.Size(j) < p.sizesAsc[i].Count; j++ {
buckets[i].values <- p.alloc(p.sizesAsc[i].Capacity)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/mutable_series_iterators_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *seriesIteratorsPool) Init() {
for i := range p.sizesAsc {
buckets[i].capacity = p.sizesAsc[i].Capacity
buckets[i].values = make(chan MutableSeriesIterators, p.sizesAsc[i].Count)
for j := 0; j < p.sizesAsc[i].Count; j++ {
for j := 0; pool.Size(j) < p.sizesAsc[i].Count; j++ {
buckets[i].values <- p.alloc(p.sizesAsc[i].Capacity)
}
}
Expand Down
14 changes: 10 additions & 4 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ func withEncodingAndPoolingOptions(

logger.Info("bytes pool configured",
zap.Int("capacity", bucket.CapacityOrDefault()),
zap.Int("size", bucket.SizeOrDefault()),
zap.Int("size", int(bucket.SizeOrDefault())),
zap.Float64("refillLowWaterMark", bucket.RefillLowWaterMarkOrDefault()),
zap.Float64("refillHighWaterMark", bucket.RefillHighWaterMarkOrDefault()))
}
Expand Down Expand Up @@ -1920,7 +1920,7 @@ func poolOptions(
)

if size > 0 {
opts = opts.SetSize(size)
opts = opts.SetSize(int(size))
if refillLowWaterMark > 0 &&
refillHighWaterMark > 0 &&
refillHighWaterMark > refillLowWaterMark {
Expand All @@ -1929,6 +1929,8 @@ func poolOptions(
SetRefillHighWatermark(refillHighWaterMark)
}
}
opts = opts.SetDynamic(size.IsDynamic())

if scope != nil {
opts = opts.SetInstrumentOptions(opts.InstrumentOptions().
SetMetricsScope(scope))
Expand All @@ -1948,14 +1950,16 @@ func capacityPoolOptions(
)

if size > 0 {
opts = opts.SetSize(size)
opts = opts.SetSize(int(size))
if refillLowWaterMark > 0 &&
refillHighWaterMark > 0 &&
refillHighWaterMark > refillLowWaterMark {
opts = opts.SetRefillLowWatermark(refillLowWaterMark)
opts = opts.SetRefillHighWatermark(refillHighWaterMark)
}
}
opts = opts.SetDynamic(size.IsDynamic())

if scope != nil {
opts = opts.SetInstrumentOptions(opts.InstrumentOptions().
SetMetricsScope(scope))
Expand All @@ -1975,14 +1979,16 @@ func maxCapacityPoolOptions(
)

if size > 0 {
opts = opts.SetSize(size)
opts = opts.SetSize(int(size))
if refillLowWaterMark > 0 &&
refillHighWaterMark > 0 &&
refillHighWaterMark > refillLowWaterMark {
opts = opts.SetRefillLowWatermark(refillLowWaterMark)
opts = opts.SetRefillHighWatermark(refillHighWaterMark)
}
}
opts = opts.SetDynamic(size.IsDynamic())

if scope != nil {
opts = opts.SetInstrumentOptions(opts.InstrumentOptions().
SetMetricsScope(scope))
Expand Down
2 changes: 1 addition & 1 deletion src/msg/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Configuration struct {
// options, which extends the default object pool configuration.
type MessagePoolConfiguration struct {
// Size is the size of the pool.
Size int `yaml:"size"`
Size pool.Size `yaml:"size"`

// Watermark is the object pool watermark configuration.
Watermark pool.WatermarkConfiguration `yaml:"watermark"`
Expand Down
2 changes: 1 addition & 1 deletion src/msg/protocol/proto/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func getBytesPool(bucketSizes int, bucketCaps []int) pool.BytesPool {
buckets := make([]pool.Bucket, len(bucketCaps))
for i, cap := range bucketCaps {
buckets[i] = pool.Bucket{
Count: bucketSizes,
Count: pool.Size(bucketSizes),
Capacity: cap,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/ts/m3db/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type encodedBlockOptions struct {
func NewOptions() Options {
bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{{
Capacity: defaultCapacity,
Count: defaultCount,
Count: pool.Size(defaultCount),
}}, nil, func(s []pool.Bucket) pool.BytesPool {
return pool.NewBytesPool(s, nil)
})
Expand Down
14 changes: 11 additions & 3 deletions src/x/pool/bucketized.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
package pool

import (
"fmt"
"sort"
"strconv"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -77,13 +77,21 @@ func (p *bucketizedObjectPool) Init(alloc BucketizedAllocator) {
opts = perBucketOpts
}

opts = opts.SetSize(size)
if size > 0 {
opts = opts.SetSize(int(size))
}
opts = opts.SetDynamic(size.IsDynamic())

iopts := opts.InstrumentOptions()

if iopts.MetricsScope() != nil {
sz := strconv.Itoa(capacity)
if capacity <= 0 {
sz = "dynamic"
}
opts = opts.SetInstrumentOptions(iopts.SetMetricsScope(
iopts.MetricsScope().Tagged(map[string]string{
"bucket-capacity": fmt.Sprintf("%d", capacity),
"bucket-capacity": sz,
})))
}

Expand Down
2 changes: 1 addition & 1 deletion src/x/pool/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func getBytesPool(bucketSizes int, bucketCaps []int) *bytesPool {
buckets := make([]Bucket, len(bucketCaps))
for i, cap := range bucketCaps {
buckets[i] = Bucket{
Count: bucketSizes,
Count: Size(bucketSizes),
Capacity: cap,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/x/pool/checked_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func getCheckedBytesPool(
buckets := make([]Bucket, len(bucketCaps))
for i, cap := range bucketCaps {
buckets[i] = Bucket{
Count: bucketSizes,
Count: Size(bucketSizes),
Capacity: cap,
}
}
Expand Down
48 changes: 42 additions & 6 deletions src/x/pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

package pool

import "github.com/m3db/m3/src/x/instrument"
import (
"errors"
"strconv"

"github.com/m3db/m3/src/x/instrument"
)

// ObjectPoolConfiguration contains configuration for object pools.
type ObjectPoolConfiguration struct {
// The size of the pool.
Size int `yaml:"size"`
Size Size `yaml:"size"`

// The watermark configuration.
Watermark WatermarkConfiguration `yaml:"watermark"`
Expand All @@ -35,13 +40,15 @@ type ObjectPoolConfiguration struct {
func (c *ObjectPoolConfiguration) NewObjectPoolOptions(
instrumentOpts instrument.Options,
) ObjectPoolOptions {
size := defaultSize
if c.Size != 0 {
size = c.Size
size := _defaultSize
if c.Size > 0 {
size = int(c.Size)
}

return NewObjectPoolOptions().
SetInstrumentOptions(instrumentOpts).
SetSize(size).
SetDynamic(c.Size.IsDynamic()).
SetRefillLowWatermark(c.Watermark.RefillLowWatermark).
SetRefillHighWatermark(c.Watermark.RefillHighWatermark)
}
Expand Down Expand Up @@ -78,7 +85,7 @@ func (c *BucketizedPoolConfiguration) NewBuckets() []Bucket {
// BucketConfiguration contains configuration for a pool bucket.
type BucketConfiguration struct {
// The count of the items in the bucket.
Count int `yaml:"count"`
Count Size `yaml:"count"`

// The capacity of each item in the bucket.
Capacity int `yaml:"capacity"`
Expand All @@ -100,3 +107,32 @@ type WatermarkConfiguration struct {
// The high watermark to stop refilling the pool, if zero none.
RefillHighWatermark float64 `yaml:"high" validate:"min=0.0,max=1.0"`
}

// Size stores pool capacity for pools that can be either dynamic or pre-allocated
type Size int

// UnmarshalText unmarshals Size.
func (s *Size) UnmarshalText(b []byte) error {
if string(b) == "dynamic" {
*s = _dynamicPoolSize
return nil
}

i, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return err
}

if i < 0 {
return errors.New("pool capacity cannot be negative")
}

*s = Size(i)

return nil
}

// IsDynamic returns whether the pool should be fixed size or not.
func (s Size) IsDynamic() bool {
return s == _dynamicPoolSize
}
Loading

0 comments on commit a03e55f

Please sign in to comment.