From a03e55f263665b13eccba039ab770fa5127007ee Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Sun, 7 Mar 2021 20:12:19 -0500 Subject: [PATCH] [pool] Add support for dynamic, sync.Pool backed, object pools (#3334) --- src/cmd/services/m3dbnode/config/pooling.go | 31 +++++++----- src/dbnode/client/session.go | 2 +- .../multi_reader_iterator_array_pool.go | 2 +- .../encoding/mutable_series_iterators_pool.go | 2 +- src/dbnode/server/server.go | 14 ++++-- src/msg/consumer/config.go | 2 +- src/msg/protocol/proto/roundtrip_test.go | 2 +- src/query/ts/m3db/options.go | 2 +- src/x/pool/bucketized.go | 14 ++++-- src/x/pool/bytes_test.go | 2 +- src/x/pool/checked_bytes_test.go | 2 +- src/x/pool/config.go | 48 ++++++++++++++++--- src/x/pool/config_test.go | 40 +++++++++++++++- src/x/pool/floats_test.go | 2 +- src/x/pool/object.go | 47 ++++++++++++++++-- src/x/pool/object_test.go | 35 ++++++++++++++ src/x/pool/options.go | 29 ++++++++--- src/x/pool/types.go | 8 +++- 18 files changed, 237 insertions(+), 47 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/pooling.go b/src/cmd/services/m3dbnode/config/pooling.go index 584397a86b..5f92617446 100644 --- a/src/cmd/services/m3dbnode/config/pooling.go +++ b/src/cmd/services/m3dbnode/config/pooling.go @@ -20,7 +20,11 @@ package config -import "fmt" +import ( + "fmt" + + "github.com/m3db/m3/src/x/pool" +) // PoolingType is a type of pooling, using runtime or mmap'd bytes pooling. type PoolingType string @@ -39,7 +43,7 @@ const ( ) type poolPolicyDefault struct { - size int + size pool.Size refillLowWaterMark float64 refillHighWaterMark float64 @@ -214,7 +218,7 @@ var ( { Capacity: intPtr(16), PoolPolicy: PoolPolicy{ - Size: intPtr(524288), + Size: poolSizePtr(524288), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -222,7 +226,7 @@ var ( { Capacity: intPtr(32), PoolPolicy: PoolPolicy{ - Size: intPtr(262144), + Size: poolSizePtr(262144), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -230,7 +234,7 @@ var ( { Capacity: intPtr(64), PoolPolicy: PoolPolicy{ - Size: intPtr(131072), + Size: poolSizePtr(131072), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -238,7 +242,7 @@ var ( { Capacity: intPtr(128), PoolPolicy: PoolPolicy{ - Size: intPtr(65536), + Size: poolSizePtr(65536), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -246,7 +250,7 @@ var ( { Capacity: intPtr(256), PoolPolicy: PoolPolicy{ - Size: intPtr(65536), + Size: poolSizePtr(65536), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -254,7 +258,7 @@ var ( { Capacity: intPtr(1440), PoolPolicy: PoolPolicy{ - Size: intPtr(16384), + Size: poolSizePtr(16384), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -262,7 +266,7 @@ var ( { Capacity: intPtr(4096), PoolPolicy: PoolPolicy{ - Size: intPtr(8192), + Size: poolSizePtr(8192), RefillLowWaterMark: &defaultRefillLowWaterMark, RefillHighWaterMark: &defaultRefillHighWaterMark, }, @@ -486,7 +490,7 @@ func (p *PoolingPolicy) TypeOrDefault() PoolingType { // PoolPolicy specifies a single pool policy. type PoolPolicy struct { // The size of the pool. - Size *int `yaml:"size"` + Size *pool.Size `yaml:"size"` // The low watermark to start refilling the pool, if zero none. RefillLowWaterMark *float64 `yaml:"lowWatermark"` @@ -525,7 +529,7 @@ func (p *PoolPolicy) initDefaultsAndValidate(poolName string) error { } // SizeOrDefault returns the configured size if present, or a default value otherwise. -func (p *PoolPolicy) SizeOrDefault() int { +func (p *PoolPolicy) SizeOrDefault() pool.Size { return *p.Size } @@ -668,3 +672,8 @@ type WriteBatchPoolPolicy struct { func intPtr(x int) *int { return &x } + +func poolSizePtr(x int) *pool.Size { + sz := pool.Size(x) + return &sz +} diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 6ffa119ad6..fc3bbf1708 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -1005,7 +1005,7 @@ func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue, s.pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool([]pool.Bucket{ { Capacity: replicas, - Count: s.opts.SeriesIteratorPoolSize(), + Count: pool.Size(s.opts.SeriesIteratorPoolSize()), }, }) s.pools.multiReaderIteratorArray.Init() diff --git a/src/dbnode/encoding/multi_reader_iterator_array_pool.go b/src/dbnode/encoding/multi_reader_iterator_array_pool.go index cd0330962c..c5ac88c36d 100644 --- a/src/dbnode/encoding/multi_reader_iterator_array_pool.go +++ b/src/dbnode/encoding/multi_reader_iterator_array_pool.go @@ -66,7 +66,7 @@ func (p *multiReaderIteratorArrayPool) Init() { for i := range p.sizesAsc { buckets[i].capacity = p.sizesAsc[i].Capacity buckets[i].values = make(chan []MultiReaderIterator, p.sizesAsc[i].Count) - for j := 0; j < p.sizesAsc[i].Count; j++ { + for j := 0; pool.Size(j) < p.sizesAsc[i].Count; j++ { buckets[i].values <- p.alloc(p.sizesAsc[i].Capacity) } } diff --git a/src/dbnode/encoding/mutable_series_iterators_pool.go b/src/dbnode/encoding/mutable_series_iterators_pool.go index 914b2c5b0e..0b9a2f5bc8 100644 --- a/src/dbnode/encoding/mutable_series_iterators_pool.go +++ b/src/dbnode/encoding/mutable_series_iterators_pool.go @@ -59,7 +59,7 @@ func (p *seriesIteratorsPool) Init() { for i := range p.sizesAsc { buckets[i].capacity = p.sizesAsc[i].Capacity buckets[i].values = make(chan MutableSeriesIterators, p.sizesAsc[i].Count) - for j := 0; j < p.sizesAsc[i].Count; j++ { + for j := 0; pool.Size(j) < p.sizesAsc[i].Count; j++ { buckets[i].values <- p.alloc(p.sizesAsc[i].Capacity) } } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 48f38856d1..f57dde732c 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1533,7 +1533,7 @@ func withEncodingAndPoolingOptions( logger.Info("bytes pool configured", zap.Int("capacity", bucket.CapacityOrDefault()), - zap.Int("size", bucket.SizeOrDefault()), + zap.Int("size", int(bucket.SizeOrDefault())), zap.Float64("refillLowWaterMark", bucket.RefillLowWaterMarkOrDefault()), zap.Float64("refillHighWaterMark", bucket.RefillHighWaterMarkOrDefault())) } @@ -1920,7 +1920,7 @@ func poolOptions( ) if size > 0 { - opts = opts.SetSize(size) + opts = opts.SetSize(int(size)) if refillLowWaterMark > 0 && refillHighWaterMark > 0 && refillHighWaterMark > refillLowWaterMark { @@ -1929,6 +1929,8 @@ func poolOptions( SetRefillHighWatermark(refillHighWaterMark) } } + opts = opts.SetDynamic(size.IsDynamic()) + if scope != nil { opts = opts.SetInstrumentOptions(opts.InstrumentOptions(). SetMetricsScope(scope)) @@ -1948,7 +1950,7 @@ func capacityPoolOptions( ) if size > 0 { - opts = opts.SetSize(size) + opts = opts.SetSize(int(size)) if refillLowWaterMark > 0 && refillHighWaterMark > 0 && refillHighWaterMark > refillLowWaterMark { @@ -1956,6 +1958,8 @@ func capacityPoolOptions( opts = opts.SetRefillHighWatermark(refillHighWaterMark) } } + opts = opts.SetDynamic(size.IsDynamic()) + if scope != nil { opts = opts.SetInstrumentOptions(opts.InstrumentOptions(). SetMetricsScope(scope)) @@ -1975,7 +1979,7 @@ func maxCapacityPoolOptions( ) if size > 0 { - opts = opts.SetSize(size) + opts = opts.SetSize(int(size)) if refillLowWaterMark > 0 && refillHighWaterMark > 0 && refillHighWaterMark > refillLowWaterMark { @@ -1983,6 +1987,8 @@ func maxCapacityPoolOptions( opts = opts.SetRefillHighWatermark(refillHighWaterMark) } } + opts = opts.SetDynamic(size.IsDynamic()) + if scope != nil { opts = opts.SetInstrumentOptions(opts.InstrumentOptions(). SetMetricsScope(scope)) diff --git a/src/msg/consumer/config.go b/src/msg/consumer/config.go index 9ef1c99620..20babac82f 100644 --- a/src/msg/consumer/config.go +++ b/src/msg/consumer/config.go @@ -44,7 +44,7 @@ type Configuration struct { // options, which extends the default object pool configuration. type MessagePoolConfiguration struct { // Size is the size of the pool. - Size int `yaml:"size"` + Size pool.Size `yaml:"size"` // Watermark is the object pool watermark configuration. Watermark pool.WatermarkConfiguration `yaml:"watermark"` diff --git a/src/msg/protocol/proto/roundtrip_test.go b/src/msg/protocol/proto/roundtrip_test.go index de5202c6c8..cdb591b410 100644 --- a/src/msg/protocol/proto/roundtrip_test.go +++ b/src/msg/protocol/proto/roundtrip_test.go @@ -197,7 +197,7 @@ func getBytesPool(bucketSizes int, bucketCaps []int) pool.BytesPool { buckets := make([]pool.Bucket, len(bucketCaps)) for i, cap := range bucketCaps { buckets[i] = pool.Bucket{ - Count: bucketSizes, + Count: pool.Size(bucketSizes), Capacity: cap, } } diff --git a/src/query/ts/m3db/options.go b/src/query/ts/m3db/options.go index 153df2aac7..03606790cd 100644 --- a/src/query/ts/m3db/options.go +++ b/src/query/ts/m3db/options.go @@ -70,7 +70,7 @@ type encodedBlockOptions struct { func NewOptions() Options { bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{{ Capacity: defaultCapacity, - Count: defaultCount, + Count: pool.Size(defaultCount), }}, nil, func(s []pool.Bucket) pool.BytesPool { return pool.NewBytesPool(s, nil) }) diff --git a/src/x/pool/bucketized.go b/src/x/pool/bucketized.go index a564302cc4..84c575bd4b 100644 --- a/src/x/pool/bucketized.go +++ b/src/x/pool/bucketized.go @@ -21,8 +21,8 @@ package pool import ( - "fmt" "sort" + "strconv" "github.com/uber-go/tally" ) @@ -77,13 +77,21 @@ func (p *bucketizedObjectPool) Init(alloc BucketizedAllocator) { opts = perBucketOpts } - opts = opts.SetSize(size) + if size > 0 { + opts = opts.SetSize(int(size)) + } + opts = opts.SetDynamic(size.IsDynamic()) + iopts := opts.InstrumentOptions() if iopts.MetricsScope() != nil { + sz := strconv.Itoa(capacity) + if capacity <= 0 { + sz = "dynamic" + } opts = opts.SetInstrumentOptions(iopts.SetMetricsScope( iopts.MetricsScope().Tagged(map[string]string{ - "bucket-capacity": fmt.Sprintf("%d", capacity), + "bucket-capacity": sz, }))) } diff --git a/src/x/pool/bytes_test.go b/src/x/pool/bytes_test.go index 39a3c0c620..f151b039ac 100644 --- a/src/x/pool/bytes_test.go +++ b/src/x/pool/bytes_test.go @@ -89,7 +89,7 @@ func getBytesPool(bucketSizes int, bucketCaps []int) *bytesPool { buckets := make([]Bucket, len(bucketCaps)) for i, cap := range bucketCaps { buckets[i] = Bucket{ - Count: bucketSizes, + Count: Size(bucketSizes), Capacity: cap, } } diff --git a/src/x/pool/checked_bytes_test.go b/src/x/pool/checked_bytes_test.go index 7cecaf8158..dabb1f0a91 100644 --- a/src/x/pool/checked_bytes_test.go +++ b/src/x/pool/checked_bytes_test.go @@ -109,7 +109,7 @@ func getCheckedBytesPool( buckets := make([]Bucket, len(bucketCaps)) for i, cap := range bucketCaps { buckets[i] = Bucket{ - Count: bucketSizes, + Count: Size(bucketSizes), Capacity: cap, } } diff --git a/src/x/pool/config.go b/src/x/pool/config.go index 6e08286e98..fa967d7b41 100644 --- a/src/x/pool/config.go +++ b/src/x/pool/config.go @@ -20,12 +20,17 @@ package pool -import "github.com/m3db/m3/src/x/instrument" +import ( + "errors" + "strconv" + + "github.com/m3db/m3/src/x/instrument" +) // ObjectPoolConfiguration contains configuration for object pools. type ObjectPoolConfiguration struct { // The size of the pool. - Size int `yaml:"size"` + Size Size `yaml:"size"` // The watermark configuration. Watermark WatermarkConfiguration `yaml:"watermark"` @@ -35,13 +40,15 @@ type ObjectPoolConfiguration struct { func (c *ObjectPoolConfiguration) NewObjectPoolOptions( instrumentOpts instrument.Options, ) ObjectPoolOptions { - size := defaultSize - if c.Size != 0 { - size = c.Size + size := _defaultSize + if c.Size > 0 { + size = int(c.Size) } + return NewObjectPoolOptions(). SetInstrumentOptions(instrumentOpts). SetSize(size). + SetDynamic(c.Size.IsDynamic()). SetRefillLowWatermark(c.Watermark.RefillLowWatermark). SetRefillHighWatermark(c.Watermark.RefillHighWatermark) } @@ -78,7 +85,7 @@ func (c *BucketizedPoolConfiguration) NewBuckets() []Bucket { // BucketConfiguration contains configuration for a pool bucket. type BucketConfiguration struct { // The count of the items in the bucket. - Count int `yaml:"count"` + Count Size `yaml:"count"` // The capacity of each item in the bucket. Capacity int `yaml:"capacity"` @@ -100,3 +107,32 @@ type WatermarkConfiguration struct { // The high watermark to stop refilling the pool, if zero none. RefillHighWatermark float64 `yaml:"high" validate:"min=0.0,max=1.0"` } + +// Size stores pool capacity for pools that can be either dynamic or pre-allocated +type Size int + +// UnmarshalText unmarshals Size. +func (s *Size) UnmarshalText(b []byte) error { + if string(b) == "dynamic" { + *s = _dynamicPoolSize + return nil + } + + i, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return err + } + + if i < 0 { + return errors.New("pool capacity cannot be negative") + } + + *s = Size(i) + + return nil +} + +// IsDynamic returns whether the pool should be fixed size or not. +func (s Size) IsDynamic() bool { + return s == _dynamicPoolSize +} diff --git a/src/x/pool/config_test.go b/src/x/pool/config_test.go index 7bbe5a08b5..0c0135f46d 100644 --- a/src/x/pool/config_test.go +++ b/src/x/pool/config_test.go @@ -23,9 +23,10 @@ package pool import ( "testing" - "github.com/m3db/m3/src/x/instrument" - "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/m3db/m3/src/x/instrument" ) func TestObjectPoolConfiguration(t *testing.T) { @@ -38,10 +39,39 @@ func TestObjectPoolConfiguration(t *testing.T) { } opts := cfg.NewObjectPoolOptions(instrument.NewOptions()).(*objectPoolOptions) require.Equal(t, 1, opts.size) + require.False(t, opts.Dynamic()) require.Equal(t, 0.1, opts.refillLowWatermark) require.Equal(t, 0.5, opts.refillHighWatermark) } +func TestDynamicObjectPoolConfiguration(t *testing.T) { + cfg := ObjectPoolConfiguration{ + Size: _dynamicPoolSize, + } + opts := cfg.NewObjectPoolOptions(instrument.NewOptions()).(*objectPoolOptions) + require.Equal(t, _dynamicPoolSize, opts.Size()) + require.True(t, opts.Dynamic()) + + cfg, err := cfgFromStr(` +size: dynamic +`) + require.NoError(t, err) + + opts = cfg.NewObjectPoolOptions(instrument.NewOptions()).(*objectPoolOptions) + require.Equal(t, _dynamicPoolSize, opts.Size()) + require.True(t, opts.Dynamic()) + + _, err = cfgFromStr(` +size: invalid +`) + require.Error(t, err) + + _, err = cfgFromStr(` +size: -10 +`) + require.Error(t, err) +} + func TestBucketizedPoolConfiguration(t *testing.T) { cfg := BucketizedPoolConfiguration{ Buckets: []BucketConfiguration{ @@ -62,3 +92,9 @@ func TestBucketizedPoolConfiguration(t *testing.T) { require.Equal(t, 0.1, opts.refillLowWatermark) require.Equal(t, 0.5, opts.refillHighWatermark) } + +func cfgFromStr(config string) (ObjectPoolConfiguration, error) { + cfg := ObjectPoolConfiguration{} + err := yaml.Unmarshal([]byte(config), &cfg) + return cfg, err +} diff --git a/src/x/pool/floats_test.go b/src/x/pool/floats_test.go index 544a4f065a..aa92e82e9b 100644 --- a/src/x/pool/floats_test.go +++ b/src/x/pool/floats_test.go @@ -53,7 +53,7 @@ func getFloatsPool(bucketSizes int, bucketCaps []int) *floatsPool { buckets := make([]Bucket, len(bucketCaps)) for i, cap := range bucketCaps { buckets[i] = Bucket{ - Count: bucketSizes, + Count: Size(bucketSizes), Capacity: cap, } } diff --git a/src/x/pool/object.go b/src/x/pool/object.go index 2ef9c50b16..ebedf02ff3 100644 --- a/src/x/pool/object.go +++ b/src/x/pool/object.go @@ -23,6 +23,7 @@ package pool import ( "errors" "math" + "sync" "sync/atomic" "github.com/m3db/m3/src/x/unsafe" @@ -65,6 +66,21 @@ func NewObjectPool(opts ObjectPoolOptions) ObjectPool { opts = NewObjectPoolOptions() } + uninitializedAllocatorFn := func() interface{} { + fn := opts.OnPoolAccessErrorFn() + fn(errPoolAccessBeforeInitialized) + return nil + } + + if opts.Dynamic() { + return &dynamicPool{ + pool: sync.Pool{ + New: uninitializedAllocatorFn, + }, + onPoolAccessErrorFn: opts.OnPoolAccessErrorFn(), + } + } + m := opts.InstrumentOptions().MetricsScope() p := &objectPool{ @@ -80,11 +96,7 @@ func NewObjectPool(opts ObjectPoolOptions) ObjectPool { putOnFull: m.Counter("put-on-full"), }, onPoolAccessErrorFn: opts.OnPoolAccessErrorFn(), - alloc: func() interface{} { - fn := opts.OnPoolAccessErrorFn() - fn(errPoolAccessBeforeInitialized) - return nil - }, + alloc: uninitializedAllocatorFn, } p.setGauges() @@ -167,3 +179,28 @@ func (p *objectPool) tryFill() { } }() } + +var _ ObjectPool = (*dynamicPool)(nil) + +type dynamicPool struct { + pool sync.Pool + onPoolAccessErrorFn OnPoolAccessErrorFn + initialized int32 +} + +func (d *dynamicPool) Init(alloc Allocator) { + if !atomic.CompareAndSwapInt32(&d.initialized, 0, 1) { + d.onPoolAccessErrorFn(errPoolAlreadyInitialized) + return + } + + d.pool.New = alloc +} + +func (d *dynamicPool) Get() interface{} { + return d.pool.Get() +} + +func (d *dynamicPool) Put(x interface{}) { + d.pool.Put(x) +} diff --git a/src/x/pool/object_test.go b/src/x/pool/object_test.go index bf039943c4..3992b5420d 100644 --- a/src/x/pool/object_test.go +++ b/src/x/pool/object_test.go @@ -21,6 +21,7 @@ package pool import ( + "runtime" "strconv" "testing" "time" @@ -192,6 +193,40 @@ func BenchmarkObjectPoolParallelGetMultiPutContended(b *testing.B) { o := bufs[i] buf := *o buf = strconv.AppendInt(buf[:0], 12344321, 10) + runtime.KeepAlive(buf) + p.Put(o) + } + } + }) +} + +//nolint:dupl +func BenchmarkObjectPoolParallelGetMultiPutContendedDynamic(b *testing.B) { + opts := NewObjectPoolOptions(). + SetDynamic(true) + + p := NewObjectPool(opts) + p.Init(func() interface{} { + b := make([]byte, 0, 64) + return &b + }) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + bufs := make([]*[]byte, 16) + for i := 0; i < len(bufs); i++ { + o, ok := p.Get().(*[]byte) + if !ok { + b.Fail() + } + bufs[i] = o + } + for i := 0; i < len(bufs); i++ { + o := bufs[i] + buf := *o + buf = strconv.AppendInt(buf[:0], 12344321, 10) + runtime.KeepAlive(buf) p.Put(o) } } diff --git a/src/x/pool/options.go b/src/x/pool/options.go index fd70325d9c..de2440c9a4 100644 --- a/src/x/pool/options.go +++ b/src/x/pool/options.go @@ -23,9 +23,11 @@ package pool import "github.com/m3db/m3/src/x/instrument" const ( - defaultSize = 4096 - defaultRefillLowWatermark = 0.0 - defaultRefillHighWatermark = 0.0 + _defaultSize = 4096 + _defaultRefillLowWatermark = 0.0 + _defaultRefillHighWatermark = 0.0 + + _dynamicPoolSize = -1 ) type objectPoolOptions struct { @@ -34,14 +36,16 @@ type objectPoolOptions struct { refillHighWatermark float64 instrumentOpts instrument.Options onPoolAccessErrorFn OnPoolAccessErrorFn + dynamic bool } // NewObjectPoolOptions creates a new set of object pool options func NewObjectPoolOptions() ObjectPoolOptions { return &objectPoolOptions{ - size: defaultSize, - refillLowWatermark: defaultRefillLowWatermark, - refillHighWatermark: defaultRefillHighWatermark, + size: _defaultSize, + dynamic: false, + refillLowWatermark: _defaultRefillLowWatermark, + refillHighWatermark: _defaultRefillHighWatermark, instrumentOpts: instrument.NewOptions(), onPoolAccessErrorFn: func(err error) { panic(err) }, } @@ -54,9 +58,22 @@ func (o *objectPoolOptions) SetSize(value int) ObjectPoolOptions { } func (o *objectPoolOptions) Size() int { + if o.dynamic { + return _dynamicPoolSize + } return o.size } +func (o *objectPoolOptions) SetDynamic(dynamic bool) ObjectPoolOptions { + opts := *o + opts.dynamic = dynamic + return &opts +} + +func (o *objectPoolOptions) Dynamic() bool { + return o.dynamic +} + func (o *objectPoolOptions) SetRefillLowWatermark(value float64) ObjectPoolOptions { opts := *o opts.refillLowWatermark = value diff --git a/src/x/pool/types.go b/src/x/pool/types.go index 872189a619..4926139247 100644 --- a/src/x/pool/types.go +++ b/src/x/pool/types.go @@ -53,6 +53,12 @@ type ObjectPoolOptions interface { // Size returns the size of the object pool. Size() int + // SetDynamic creates a dynamically-sized, non-preallocated pool. + SetDynamic(value bool) ObjectPoolOptions + + // Dynamic returns if the pool is dynamic. + Dynamic() bool + // SetRefillLowWatermark sets the refill low watermark value between [0, 1), // if zero then no refills occur. SetRefillLowWatermark(value float64) ObjectPoolOptions @@ -90,7 +96,7 @@ type Bucket struct { Capacity int // Count is the number of fixed elements in the bucket. - Count int + Count Size // Options is an optional override to specify options to use for a bucket, // specify nil to use the options specified to the bucketized pool