Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find/glob cache fixes #643

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,12 @@ func (app *App) Start() (err error) {
carbonserver.SetIdleTimeout(conf.Carbonserver.IdleTimeout.Value())
carbonserver.SetWriteTimeout(conf.Carbonserver.WriteTimeout.Value())
carbonserver.SetQueryCacheEnabled(conf.Carbonserver.QueryCacheEnabled)
carbonserver.SetQueryCacheSizeMB(conf.Carbonserver.QueryCacheSizeMB)
carbonserver.SetStreamingQueryCacheEnabled(conf.Carbonserver.StreamingQueryCacheEnabled)
carbonserver.SetFindCacheEnabled(conf.Carbonserver.FindCacheEnabled)
carbonserver.SetQueryCacheSizeMB(conf.Carbonserver.QueryCacheSizeMB)
carbonserver.SetFindCacheSizeMB(conf.Carbonserver.FindCacheSizeMB)
carbonserver.SetGlobCacheEnabled(conf.Carbonserver.GlobCacheEnabled)
carbonserver.SetGlobCacheSizeMB(conf.Carbonserver.GlobCacheSizeMB)
carbonserver.SetTrigramIndex(conf.Carbonserver.TrigramIndex)
carbonserver.SetTrieIndex(conf.Carbonserver.TrieIndex)
carbonserver.SetConcurrentIndex(conf.Carbonserver.ConcurrentIndex)
Expand Down
6 changes: 6 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ type carbonserverConfig struct {
StreamingQueryCacheEnabled bool `toml:"streaming-query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
FindCacheSizeMB int `toml:"find-cache-size-mb"`
GlobCacheEnabled bool `toml:"glob-cache-enabled"`
GlobCacheSizeMB int `toml:"glob-cache-size-mb"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
Expand Down Expand Up @@ -273,6 +276,9 @@ func NewConfig() *Config {
QueryCacheEnabled: true,
QueryCacheSizeMB: 0,
FindCacheEnabled: true,
FindCacheSizeMB: 0,
GlobCacheEnabled: true,
GlobCacheSizeMB: 0,
TrigramIndex: true,
CacheScan: false,
MaxMetricsGlobbed: 10_000_000,
Expand Down
51 changes: 38 additions & 13 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,11 @@ type CarbonserverListener struct {
queryCacheSizeMB int
queryCache expireCache
findCacheEnabled bool
findCacheSizeMB int
findCache expireCache
expandedGlobsCache expireCache
globCacheEnabled bool
globCacheSizeMB int
globCache expireCache
trigramIndex bool
trieIndex bool
concurrentIndex bool
Expand Down Expand Up @@ -475,15 +478,15 @@ type fileIndex struct {
func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *CarbonserverListener {
return &CarbonserverListener{
// Config variables
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: expireCache{ec: expirecache.New(0)},
expandedGlobsCache: expireCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: expireCache{ec: expirecache.New(0)},
globCache: expireCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
prometheus: prometheus{
request: func(string, int) {},
duration: func(time.Duration) {},
Expand Down Expand Up @@ -565,15 +568,24 @@ func (listener *CarbonserverListener) SetMetricsAsCounters(metricsAsCounters boo
func (listener *CarbonserverListener) SetQueryCacheEnabled(enabled bool) {
listener.queryCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetStreamingQueryCacheEnabled(enabled bool) {
listener.streamingQueryCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetQueryCacheSizeMB(size int) {
listener.queryCacheSizeMB = size
}
func (listener *CarbonserverListener) SetStreamingQueryCacheEnabled(enabled bool) {
listener.streamingQueryCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetFindCacheEnabled(enabled bool) {
listener.findCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetFindCacheSizeMB(size int) {
listener.findCacheSizeMB = size
}
func (listener *CarbonserverListener) SetGlobCacheEnabled(enabled bool) {
listener.globCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetGlobCacheSizeMB(size int) {
listener.globCacheSizeMB = size
}
func (listener *CarbonserverListener) SetTrigramIndex(enabled bool) {
listener.trigramIndex = enabled
}
Expand Down Expand Up @@ -1791,6 +1803,12 @@ func (listener *CarbonserverListener) Listen(listen string) error {
}

listener.queryCache = expireCache{ec: expirecache.New(uint64(listener.queryCacheSizeMB))}
if listener.findCacheEnabled {
listener.findCache = expireCache{ec: expirecache.New(uint64(listener.findCacheSizeMB))}
}
if listener.globCacheEnabled {
listener.globCache = expireCache{ec: expirecache.New(uint64(listener.globCacheSizeMB))}
}

// +1 to track every over the number of buckets we track
listener.timeBuckets = make([]uint64, listener.buckets+1)
Expand Down Expand Up @@ -1924,7 +1942,14 @@ func (listener *CarbonserverListener) Listen(listen string) error {
}
}

// cache cleaners
go listener.queryCache.ec.StoppableApproximateCleaner(10*time.Second, listener.exitChan)
if listener.findCacheEnabled {
go listener.findCache.ec.StoppableApproximateCleaner(60*time.Second, listener.exitChan)
}
if listener.globCacheEnabled {
go listener.globCache.ec.StoppableApproximateCleaner(60*time.Second, listener.exitChan)
}

srv := &http.Server{
Handler: gziphandler.GzipHandler(carbonserverMux),
Expand Down
15 changes: 11 additions & 4 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,17 @@ GATHER:
func (listener *CarbonserverListener) getExpandedGlobsWithCache(ctx context.Context, logger *zap.Logger, handler string, queries []string) ([]globs, bool, error) {
key := strings.Join(queries, "&")
size := uint64(100 * 1024 * 1024)
expandedGlobs, isCacheHit, err := getWithCache(logger, listener.expandedGlobsCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
})
var expandedGlobs interface{}
var err error
isCacheHit := false
if listener.globCacheEnabled {
expandedGlobs, isCacheHit, err = getWithCache(logger, listener.globCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
})
} else {
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
}

var expandedGlobsCasted []globs
if err == nil {
Expand Down
6 changes: 6 additions & 0 deletions go-carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ streaming-query-cache-enabled = false
query-cache-size-mb = 0
# Enable /metrics/find cache, it will cache the result for 5 minutes
find-cache-enabled = true
# 0 for unlimited
find-cache-size-mb = 0
# Enable expanded globs cache, it will cache the glob expansion result for 5 minutes
glob-cache-enabled = true
# 0 for unlimited
glob-cache-size-mb = 0
# Control trigram index
# This index is used to speed-up /find requests
# However, it will lead to increased memory consumption
Expand Down
Loading