Skip to content

Commit

Permalink
optimisation(carbonserver): separate grpc expandedGlobsCache from fin…
Browse files Browse the repository at this point in the history
…dCache into a separate one, and restore response caching in findCache; and use expandedGlobsCache in http find/render

This is expected to speed up http renders.

A few points about the new cache:
* it's initialised without memory limit, same as find response and render response caches. Might be worth to review this in future for all 3 caches.
* there is no toggle to disable this cache, it can be added later if needed.
* 4 graphite metrics are introduced for visibility into cache performance:
        find_expanded_globs_cache_hit
        find_expanded_globs_cache_miss
        render_expanded_globs_cache_hit
        render_expanded_globs_cache_miss
  • Loading branch information
Anton Timofieiev committed Feb 6, 2023
1 parent 8192946 commit 676cb0e
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 82 deletions.
110 changes: 61 additions & 49 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,47 +66,51 @@ import (
)

type metricStruct struct {
RenderRequests uint64
RenderErrors uint64
NotFound uint64
FindRequests uint64
FindErrors uint64
FindZero uint64
InfoRequests uint64
InfoErrors uint64
ListRequests uint64
ListErrors uint64
ListQueryRequests uint64
ListQueryErrors uint64
DetailsRequests uint64
DetailsErrors uint64
CacheHit uint64
CacheMiss uint64
CacheRequestsTotal uint64
CacheWorkTimeNS uint64
CacheWaitTimeFetchNS uint64
DiskWaitTimeNS uint64
DiskRequests uint64
PointsReturned uint64
MetricsReturned uint64
MetricsKnown uint64
FileScanTimeNS uint64
IndexBuildTimeNS uint64
MetricsFetched uint64
MetricsFound uint64
ThrottledCreates uint64
MaxCreatesPerSecond uint64
FetchSize uint64
QueryCacheHit uint64
QueryCacheMiss uint64
FindCacheHit uint64
FindCacheMiss uint64
TrieNodes uint64
TrieFiles uint64
TrieDirs uint64
TrieCountNodesTimeNs uint64
QuotaApplyTimeNs uint64
UsageRefreshTimeNs uint64
RenderRequests uint64
RenderErrors uint64
NotFound uint64
FindRequests uint64
FindErrors uint64
FindZero uint64
InfoRequests uint64
InfoErrors uint64
ListRequests uint64
ListErrors uint64
ListQueryRequests uint64
ListQueryErrors uint64
DetailsRequests uint64
DetailsErrors uint64
CacheHit uint64
CacheMiss uint64
CacheRequestsTotal uint64
CacheWorkTimeNS uint64
CacheWaitTimeFetchNS uint64
DiskWaitTimeNS uint64
DiskRequests uint64
PointsReturned uint64
MetricsReturned uint64
MetricsKnown uint64
FileScanTimeNS uint64
IndexBuildTimeNS uint64
MetricsFetched uint64
MetricsFound uint64
ThrottledCreates uint64
MaxCreatesPerSecond uint64
FetchSize uint64
QueryCacheHit uint64
QueryCacheMiss uint64
FindCacheHit uint64
FindCacheMiss uint64
findExpandedGlobsCachedHit uint64
findExpandedGlobsCacheMiss uint64
renderExpandedGlobsCacheHit uint64
renderExpandedGlobsCacheMiss uint64
TrieNodes uint64
TrieFiles uint64
TrieDirs uint64
TrieCountNodesTimeNs uint64
QuotaApplyTimeNs uint64
UsageRefreshTimeNs uint64

InflightRequests uint64
RejectedTooManyRequests uint64
Expand Down Expand Up @@ -255,6 +259,7 @@ type CarbonserverListener struct {
queryCache queryCache
findCacheEnabled bool
findCache queryCache
expandedGlobsCache queryCache // TODO: rename queryCache type to be more generic
trigramIndex bool
trieIndex bool
concurrentIndex bool
Expand Down Expand Up @@ -473,14 +478,15 @@ type fileIndex struct {
func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *CarbonserverListener {
return &CarbonserverListener{
// Config variables
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: queryCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
metrics: &metricStruct{},
metricsAsCounters: false,
cacheGet: cacheGetFunc,
logger: zapwriter.Logger("carbonserver"),
accessLogger: zapwriter.Logger("access"),
findCache: queryCache{ec: expirecache.New(0)},
expandedGlobsCache: queryCache{ec: expirecache.New(0)},
trigramIndex: true,
percentiles: []int{100, 99, 98, 95, 75, 50},
prometheus: prometheus{
request: func(string, int) {},
duration: func(time.Duration) {},
Expand Down Expand Up @@ -1523,6 +1529,12 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) {
sender("find_cache_hit", &listener.metrics.FindCacheHit, send)
sender("find_cache_miss", &listener.metrics.FindCacheMiss, send)

sender("find_expanded_globs_cache_hit", &listener.metrics.findExpandedGlobsCachedHit, send)
sender("find_expanded_globs_cache_miss", &listener.metrics.findExpandedGlobsCacheMiss, send)

sender("render_expanded_globs_cache_hit", &listener.metrics.renderExpandedGlobsCacheHit, send)
sender("render_expanded_globs_cache_miss", &listener.metrics.renderExpandedGlobsCacheMiss, send)

sender("inflight_requests_count", &listener.metrics.InflightRequests, send)
senderRaw("inflight_requests_limit", &listener.MaxInflightRequests, send)
sender("rejected_too_many_requests", &listener.metrics.RejectedTooManyRequests, send)
Expand Down
64 changes: 50 additions & 14 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func getProtoV2FindResponse(expandedGlob globs, query string) *protov2.GlobRespo
func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *zap.Logger, t0 time.Time, format responseFormat, names []string) (*findResponse, error) {
var result findResponse
metricsCount := uint64(0)
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, t0, names)
expandedGlobs, _, err := listener.getExpandedGlobsWithCache(ctx, logger, "find", names)
if expandedGlobs == nil {
return nil, err
}
Expand Down Expand Up @@ -412,6 +412,37 @@ GATHER:
return expandedGlobs, nil
}

func (listener *CarbonserverListener) getExpandedGlobsWithCache(ctx context.Context, logger *zap.Logger, handler string, queries []string) ([]globs, bool, error) {
key := strings.Join(queries, "&")
size := uint64(100 * 1024 * 1024)
expandedGlobs, isCacheHit, err := getWithCache(logger, listener.expandedGlobsCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), queries)
})

var expandedGlobsCasted []globs
if err == nil {
expandedGlobsCasted = expandedGlobs.([]globs)
}

switch handler {
case "find":
if isCacheHit {
atomic.AddUint64(&listener.metrics.findExpandedGlobsCachedHit, 1)
} else {
atomic.AddUint64(&listener.metrics.findExpandedGlobsCacheMiss, 1)
}
case "render":
if isCacheHit {
atomic.AddUint64(&listener.metrics.renderExpandedGlobsCacheHit, 1)
} else {
atomic.AddUint64(&listener.metrics.renderExpandedGlobsCacheMiss, 1)
}
}

return expandedGlobsCasted, isCacheHit, err
}

func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.GlobRequest) (*protov2.GlobResponse, error) {
t0 := time.Now()
span := trace.SpanFromContext(ctx)
Expand Down Expand Up @@ -456,13 +487,23 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo
fromCache := false
var finalRes *protov2.GlobResponse
var lookups uint32
expandedGlobs, _, err := listener.getExpandedGlobsWithCache(ctx, logger, "find", []string{query})
if err != nil {
return nil, err
}

if listener.findCacheEnabled {
key := query
key := query + "&" + format + "grpc"
size := uint64(100 * 1024 * 1024)
var result interface{}
result, fromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, t0, []string{query})
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
return nil, errorNotFound{}
}
return finalRes, nil
})
if err == nil {
listener.prometheus.cacheRequest("find", fromCache)
Expand All @@ -471,21 +512,16 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo
} else {
atomic.AddUint64(&listener.metrics.FindCacheMiss, 1)
}
expandedGlobs := result.([]globs)
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
finalRes = result.(*protov2.GlobResponse)
if len(finalRes.Matches) == 0 {
err = errorNotFound{}
}
}
} else {
var expandedGlobs []globs
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, t0, []string{query})
if err == nil {
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
finalRes, err = nil, errorNotFound{}
}
} else if err == nil {
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
finalRes, err = nil, errorNotFound{}
}
}

Expand Down
23 changes: 4 additions & 19 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
expandedGlobs, isExpandCacheHit, err := listener.getExpandedGlobsWithCache(ctx, logger, "render", metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
tle.FindFromCache = isExpandCacheHit
if expandedGlobs == nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
}
Expand Down Expand Up @@ -711,28 +712,12 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
responseChan := make(chan response, 1000)

fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
var err error
var findFromCache bool
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
var expandedGlobs []globs
if listener.findCacheEnabled {
key := strings.Join(metricNames, "&")
size := uint64(100 * 1024 * 1024)
var result interface{}
result, findFromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
})
if err == nil {
expandedGlobs = result.([]globs)
tle.FindFromCache = findFromCache
}
} else {
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
}
expandedGlobs, isExpandCacheHit, err := listener.getExpandedGlobsWithCache(ctx, logger, "render", metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
tle.FindFromCache = isExpandCacheHit
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
Expand Down

0 comments on commit 676cb0e

Please sign in to comment.