Skip to content

Commit

Permalink
Merge pull request #643 from deniszh/dzhdanov/cache-fix
Browse files Browse the repository at this point in the history
Find/glob cache fixes
  • Loading branch information
deniszh authored Oct 11, 2024
2 parents a5c9c55 + b7bc077 commit a8c1c8b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 18 deletions.
5 changes: 4 additions & 1 deletion carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,12 @@ func (app *App) Start() (err error) {
carbonserver.SetIdleTimeout(conf.Carbonserver.IdleTimeout.Value())
carbonserver.SetWriteTimeout(conf.Carbonserver.WriteTimeout.Value())
carbonserver.SetQueryCacheEnabled(conf.Carbonserver.QueryCacheEnabled)
carbonserver.SetQueryCacheSizeMB(conf.Carbonserver.QueryCacheSizeMB)
carbonserver.SetStreamingQueryCacheEnabled(conf.Carbonserver.StreamingQueryCacheEnabled)
carbonserver.SetFindCacheEnabled(conf.Carbonserver.FindCacheEnabled)
carbonserver.SetQueryCacheSizeMB(conf.Carbonserver.QueryCacheSizeMB)
carbonserver.SetFindCacheSizeMB(conf.Carbonserver.FindCacheSizeMB)
carbonserver.SetGlobCacheEnabled(conf.Carbonserver.GlobCacheEnabled)
carbonserver.SetGlobCacheSizeMB(conf.Carbonserver.GlobCacheSizeMB)
carbonserver.SetTrigramIndex(conf.Carbonserver.TrigramIndex)
carbonserver.SetTrieIndex(conf.Carbonserver.TrieIndex)
carbonserver.SetConcurrentIndex(conf.Carbonserver.ConcurrentIndex)
Expand Down
6 changes: 6 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ type carbonserverConfig struct {
StreamingQueryCacheEnabled bool `toml:"streaming-query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
FindCacheEnabled bool `toml:"find-cache-enabled"`
FindCacheSizeMB int `toml:"find-cache-size-mb"`
GlobCacheEnabled bool `toml:"glob-cache-enabled"`
GlobCacheSizeMB int `toml:"glob-cache-size-mb"`
Buckets int `toml:"buckets"`
MaxGlobs int `toml:"max-globs"`
FailOnMaxGlobs bool `toml:"fail-on-max-globs"`
Expand Down Expand Up @@ -273,6 +276,9 @@ func NewConfig() *Config {
QueryCacheEnabled: true,
QueryCacheSizeMB: 0,
FindCacheEnabled: true,
FindCacheSizeMB: 0,
GlobCacheEnabled: true,
GlobCacheSizeMB: 0,
TrigramIndex: true,
CacheScan: false,
MaxMetricsGlobbed: 10_000_000,
Expand Down
51 changes: 38 additions & 13 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,11 @@ type CarbonserverListener struct {
queryCacheSizeMB int
queryCache expireCache
findCacheEnabled bool
findCacheSizeMB int
findCache expireCache
expandedGlobsCache expireCache
globCacheEnabled bool
globCacheSizeMB int
globCache expireCache
trigramIndex bool
trieIndex bool
concurrentIndex bool
Expand Down Expand Up @@ -475,15 +478,15 @@ type fileIndex struct {
func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *CarbonserverListener {
return &CarbonserverListener{
// Config variables
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: expireCache{ec: expirecache.New(0)},
expandedGlobsCache: expireCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: expireCache{ec: expirecache.New(0)},
globCache: expireCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
prometheus: prometheus{
request: func(string, int) {},
duration: func(time.Duration) {},
Expand Down Expand Up @@ -565,15 +568,24 @@ func (listener *CarbonserverListener) SetMetricsAsCounters(metricsAsCounters boo
func (listener *CarbonserverListener) SetQueryCacheEnabled(enabled bool) {
listener.queryCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetStreamingQueryCacheEnabled(enabled bool) {
listener.streamingQueryCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetQueryCacheSizeMB(size int) {
listener.queryCacheSizeMB = size
}
func (listener *CarbonserverListener) SetStreamingQueryCacheEnabled(enabled bool) {
listener.streamingQueryCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetFindCacheEnabled(enabled bool) {
listener.findCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetFindCacheSizeMB(size int) {
listener.findCacheSizeMB = size
}
func (listener *CarbonserverListener) SetGlobCacheEnabled(enabled bool) {
listener.globCacheEnabled = enabled
}
func (listener *CarbonserverListener) SetGlobCacheSizeMB(size int) {
listener.globCacheSizeMB = size
}
func (listener *CarbonserverListener) SetTrigramIndex(enabled bool) {
listener.trigramIndex = enabled
}
Expand Down Expand Up @@ -1791,6 +1803,12 @@ func (listener *CarbonserverListener) Listen(listen string) error {
}

listener.queryCache = expireCache{ec: expirecache.New(uint64(listener.queryCacheSizeMB))}
if listener.findCacheEnabled {
listener.findCache = expireCache{ec: expirecache.New(uint64(listener.findCacheSizeMB))}
}
if listener.globCacheEnabled {
listener.globCache = expireCache{ec: expirecache.New(uint64(listener.globCacheSizeMB))}
}

// +1 to track every over the number of buckets we track
listener.timeBuckets = make([]uint64, listener.buckets+1)
Expand Down Expand Up @@ -1924,7 +1942,14 @@ func (listener *CarbonserverListener) Listen(listen string) error {
}
}

// cache cleaners
go listener.queryCache.ec.StoppableApproximateCleaner(10*time.Second, listener.exitChan)
if listener.findCacheEnabled {
go listener.findCache.ec.StoppableApproximateCleaner(60*time.Second, listener.exitChan)
}
if listener.globCacheEnabled {
go listener.globCache.ec.StoppableApproximateCleaner(60*time.Second, listener.exitChan)
}

srv := &http.Server{
Handler: gziphandler.GzipHandler(carbonserverMux),
Expand Down
15 changes: 11 additions & 4 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,17 @@ GATHER:
func (listener *CarbonserverListener) getExpandedGlobsWithCache(ctx context.Context, logger *zap.Logger, handler string, queries []string) ([]globs, bool, error) {
key := strings.Join(queries, "&")
size := uint64(100 * 1024 * 1024)
expandedGlobs, isCacheHit, err := getWithCache(logger, listener.expandedGlobsCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
})
var expandedGlobs interface{}
var err error
isCacheHit := false
if listener.globCacheEnabled {
expandedGlobs, isCacheHit, err = getWithCache(logger, listener.globCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
})
} else {
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
}

var expandedGlobsCasted []globs
if err == nil {
Expand Down
6 changes: 6 additions & 0 deletions go-carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ streaming-query-cache-enabled = false
query-cache-size-mb = 0
# Enable /metrics/find cache, it will cache the result for 5 minutes
find-cache-enabled = true
# 0 for unlimited
find-cache-size-mb = 0
# Enable expanded globs cache, it will cache the glob expansion result for 5 minutes
glob-cache-enabled = true
# 0 for unlimited
glob-cache-size-mb = 0
# Control trigram index
# This index is used to speed-up /find requests
# However, it will lead to increased memory consumption
Expand Down

0 comments on commit a8c1c8b

Please sign in to comment.