Skip to content

Commit

Permalink
Make slab sizes and capacity for fixed size memory pool configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed May 27, 2024
1 parent e49f84b commit cb7d00d
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 15 deletions.
12 changes: 12 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5378,6 +5378,18 @@ bloom_shipper:
# cache before they get purged.
# CLI flag: -bloom.metas-lru-cache.ttl
[ttl: <duration> | default = 1h]

memory_management:
# One of: simple (simple heap allocations using Go's make([]byte, n) and no
# re-cycling of buffers), dynamic (a buffer pool with variable sized buckets
# and best effort re-cycling of buffers using Go's sync.Pool), fixed (a
# fixed size memory pool with configurable slab sizes, see mem-pool-buckets)
# CLI flag: -bloom.memory-management.alloc-type
[bloom_page_alloc_type: <string> | default = "dynamic"]

# Comma separated list of buckets in the format {size}x{bytes}
# CLI flag: -bloom.memory-management.mem-pool-buckets
[bloom_page_mem_pool_buckets: <list of Buckets> | default = 128x64KB,512x2MB,128x8MB,32x32MB,8x128MB]
```
### swift_storage_config
Expand Down
14 changes: 14 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/mempool"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -730,6 +732,18 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = v1.HeapAllocator
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New(bsCfg.MemoryManagement.BloomPageMemPoolBuckets)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand Down
48 changes: 47 additions & 1 deletion pkg/storage/bloom/v1/mempool/bucket.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,51 @@
package mempool

import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/c2h5oh/datasize"
)

type Bucket struct {
Size, Capacity int
Size int
Capacity uint64
}

func (b Bucket) Parse(s string) (any, error) {
parts := strings.Split(s, "x")
if len(parts) != 2 {
return nil, errors.New("bucket must be in format {count}x{bytes}")
}

size, err := strconv.Atoi(parts[0])
if err != nil {
return nil, err
}

capacity, err := datasize.ParseString(parts[1])
if err != nil {
panic(err.Error())
}

return Bucket{
Size: size,
Capacity: uint64(capacity),
}, nil
}

func (b Bucket) String() string {
return fmt.Sprintf("%dx%s", b.Size, datasize.ByteSize(b.Capacity).String())
}

type Buckets []Bucket

func (b Buckets) String() string {
s := make([]string, 0, len(b))
for i := range b {
s = append(s, b[i].String())
}
return strings.Join(s, ",")
}
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/mempool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func New(buckets []Bucket) *MemPool {
slabs: make([]*slab, 0, len(buckets)),
}
for _, b := range buckets {
a.slabs = append(a.slabs, newSlab(b.Capacity, b.Size))
a.slabs = append(a.slabs, newSlab(int(b.Capacity), b.Size))
}
return a
}
Expand Down
11 changes: 0 additions & 11 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"sync"

"github.com/prometheus/prometheus/util/pool"

"github.com/grafana/loki/v3/pkg/storage/bloom/v1/mempool"
)

const (
Expand Down Expand Up @@ -55,15 +53,6 @@ var (
}

HeapAllocator = &simpleHeapAllocator{}

BloomPageMemPool = mempool.New([]mempool.Bucket{
{Size: 128, Capacity: 64 << 10}, // 8MB -- for tests
{Size: 256, Capacity: 2 << 20}, // 512MB
{Size: 128, Capacity: 8 << 20}, // 1024MB
{Size: 32, Capacity: 32 << 20}, // 1024MB
{Size: 8, Capacity: 128 << 20}, // 1024MB
{Size: 2, Capacity: 512 << 20}, // 1024MB
})
)

// Allocator handles byte slices for bloom queriers.
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/grafana/loki/v3/pkg/util"
)

var BloomPageAllocator v1.Allocator

type CloseableBlockQuerier struct {
BlockRef
*v1.BlockQuerier
Expand Down Expand Up @@ -166,8 +168,8 @@ func (b BlockDirectory) BlockQuerier(
) *CloseableBlockQuerier {

var alloc v1.Allocator
if usePool {
alloc = v1.BloomPageMemPool
if usePool && BloomPageAllocator != nil {
alloc = BloomPageAllocator
} else {
alloc = v1.HeapAllocator
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ package config
import (
"errors"
"flag"
"fmt"
"slices"
"strings"
"time"

"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/storage/bloom/v1/mempool"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
lokiflagext "github.com/grafana/loki/v3/pkg/util/flagext"
)

type Config struct {
Expand All @@ -18,6 +23,7 @@ type Config struct {
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
MetasLRUCache cache.EmbeddedCacheConfig `yaml:"metas_lru_cache"`
MemoryManagement MemoryManagementConfig `yaml:"memory_management"`

// This will always be set to true when flags are registered.
// In tests, where config is created as literal, it can be set manually.
Expand All @@ -34,6 +40,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
c.MetasLRUCache.RegisterFlagsWithPrefix(prefix+"metas-lru-cache.", "In-memory LRU cache for bloom metas. ", f)
c.MemoryManagement.RegisterFlagsWithPrefix(prefix+"memory-management.", f)

// always cache LIST operations
c.CacheListOps = true
Expand All @@ -43,6 +50,9 @@ func (c *Config) Validate() error {
if len(c.WorkingDirectory) == 0 {
return errors.New("at least one working directory must be specified")
}
if err := c.MemoryManagement.Validate(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -81,3 +91,56 @@ func (cfg *BlocksCacheConfig) Validate() error {
}
return nil
}

var (
defaultMemPoolBuckets = mempool.Buckets{
{Size: 128, Capacity: 64 << 10}, // 8MiB -- for tests
{Size: 512, Capacity: 2 << 20}, // 1024MiB
{Size: 128, Capacity: 8 << 20}, // 1024MiB
{Size: 32, Capacity: 32 << 20}, // 1024MiB
{Size: 8, Capacity: 128 << 20}, // 1024MiB
}
types = supportedAllocationTypes{
"simple", "simple heap allocations using Go's make([]byte, n) and no re-cycling of buffers",
"dynamic", "a buffer pool with variable sized buckets and best effort re-cycling of buffers using Go's sync.Pool",
"fixed", "a fixed size memory pool with configurable slab sizes, see mem-pool-buckets",
}
)

type MemoryManagementConfig struct {
BloomPageAllocationType string `yaml:"bloom_page_alloc_type"`
BloomPageMemPoolBuckets lokiflagext.CSV[mempool.Bucket] `yaml:"bloom_page_mem_pool_buckets"`
}

func (cfg *MemoryManagementConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.BloomPageAllocationType, prefix+"alloc-type", "dynamic", fmt.Sprintf("One of: %s", strings.Join(types.descriptions(), ", ")))

_ = cfg.BloomPageMemPoolBuckets.Set(defaultMemPoolBuckets.String())
f.Var(&cfg.BloomPageMemPoolBuckets, prefix+"mem-pool-buckets", "Comma separated list of buckets in the format {size}x{bytes}")
}

func (cfg *MemoryManagementConfig) Validate() error {
if !slices.Contains(types.names(), cfg.BloomPageAllocationType) {
msg := fmt.Sprintf("bloom_page_alloc_type must be one of: %s", strings.Join(types.descriptions(), ", "))
return errors.New(msg)
}
return nil
}

type supportedAllocationTypes []string

func (t supportedAllocationTypes) names() []string {
names := make([]string, 0, len(t)/2)
for i := 0; i < len(t); i += 2 {
names = append(names, t[i])
}
return names
}

func (t supportedAllocationTypes) descriptions() []string {
names := make([]string, 0, len(t)/2)
for i := 0; i < len(t); i += 2 {
names = append(names, fmt.Sprintf("%s (%s)", t[i], t[i+1]))
}
return names
}
61 changes: 61 additions & 0 deletions pkg/util/flagext/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package flagext

import (
"strings"
)

type ListValue interface {
String() string
Parse(s string) (any, error)
}

// StringSliceCSV is a slice of strings that is parsed from a comma-separated string
// It implements flag.Value and yaml Marshalers
type CSV[T ListValue] []T

// String implements flag.Value
func (v CSV[T]) String() string {
s := make([]string, 0, len(v))
for i := range v {
s = append(s, v[i].String())
}
return strings.Join(s, ",")
}

// Set implements flag.Value
func (v *CSV[T]) Set(s string) error {
if len(s) == 0 {
*v = nil
return nil
}
var zero T
values := strings.Split(s, ",")
for _, val := range values {
el, err := zero.Parse(val)
if err != nil {
return err
}
*v = append(*v, el.(T))
}
return nil
}

// String implements flag.Getter
func (v CSV[T]) Get() []T {
return v
}

// UnmarshalYAML implements yaml.Unmarshaler.
func (v *CSV[T]) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}

return v.Set(s)
}

// MarshalYAML implements yaml.Marshaler.
func (v CSV[T]) MarshalYAML() (interface{}, error) {
return v.String(), nil
}
79 changes: 79 additions & 0 deletions pkg/util/flagext/csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package flagext

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

type customType int

// Parse implements ListValue.
func (l customType) Parse(s string) (any, error) {
v, err := strconv.Atoi(s)
if err != nil {
return customType(0), err
}
return customType(v), nil
}

// String implements ListValue.
func (l customType) String() string {
return strconv.Itoa(int(l))
}

var _ ListValue = customType(0)

func Test_CSV(t *testing.T) {
for _, tc := range []struct {
in string
err bool
out []customType
}{
{
in: "",
err: false,
out: nil,
},
{
in: ",",
err: true,
out: []customType{},
},
{
in: "1",
err: false,
out: []customType{1},
},
{
in: "1,2",
err: false,
out: []customType{1, 2},
},
{
in: "1,",
err: true,
out: []customType{},
},
{
in: ",1",
err: true,
out: []customType{},
},
} {
t.Run(tc.in, func(t *testing.T) {
var v CSV[customType]

err := v.Set(tc.in)
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
require.Equal(t, tc.out, v.Get())
}

})
}

}

0 comments on commit cb7d00d

Please sign in to comment.