Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache usage statistics #6317

Merged
merged 11 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [6317](https://github.com/grafana/loki/pull/6317/files) **dannykoping**: General: add cache usage statistics
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
Expand Down
7 changes: 7 additions & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ func RecordRangeAndInstantQueryMetrics(
"total_entries", stats.Summary.TotalEntriesReturned,
"queue_time", logql_stats.ConvertSecondsToNanoseconds(stats.Summary.QueueTime),
"subqueries", stats.Summary.Subqueries,
"cached_chunks_req", stats.Caches.Chunk.EntriesRequested,
"cached_chunks_hit", stats.Caches.Chunk.EntriesFound,
"cached_chunks_bytes", stats.Caches.Chunk.BytesTransferred,
"cached_indexes_req", stats.Caches.Index.EntriesRequested,
"cached_indexes_hit", stats.Caches.Index.EntriesFound,
"cached_results_req", stats.Caches.Result.EntriesRequested,
"cached_results_hit", stats.Caches.Result.EntriesFound,
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestLogSlowQuery(t *testing.T) {
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB total_entries=10 queue_time=2ns subqueries=0 source=logvolhist feature=beta\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB total_entries=10 queue_time=2ns subqueries=0 cached_chunks_req=0 cached_chunks_hit=0 cached_chunks_bytes=0 cached_indexes_req=0 cached_indexes_hit=0 cached_results_req=0 cached_results_hit=0 source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
114 changes: 114 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
type Context struct {
querier Querier
ingester Ingester
caches Caches

// store is the store statistics collected across the query path
store Store
Expand All @@ -52,6 +53,15 @@ type Context struct {
mtx sync.Mutex
}

type CacheType string

const (
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache = "index"
ResultCache = "result"
WriteDedupeCache = "write-dedupe"
Copy link
Contributor Author

@dannykopping dannykopping Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the write dedupe cache here for the sake of completeness, but it's not yet displayed along with the others.

)

// NewContext creates a new statistics context
func NewContext(ctx context.Context) (*Context, context.Context) {
contextData := &Context{}
Expand Down Expand Up @@ -79,6 +89,15 @@ func (c *Context) Ingester() Ingester {
}
}

// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
}
}

// Reset clears the statistics.
func (c *Context) Reset() {
c.mtx.Lock()
Expand All @@ -88,6 +107,7 @@ func (c *Context) Reset() {
c.querier.Reset()
c.ingester.Reset()
c.result.Reset()
c.caches.Reset()
}

// Result calculates the summary based on store and ingester data.
Expand All @@ -99,6 +119,7 @@ func (c *Context) Result(execTime time.Duration, queueTime time.Duration, totalE
Store: c.store,
},
Ingester: c.ingester,
Caches: c.caches,
})

r.ComputeSummary(execTime, queueTime, totalEntriesReturned)
Expand Down Expand Up @@ -168,12 +189,27 @@ func (i *Ingester) Merge(m Ingester) {
i.TotalReached += m.TotalReached
}

func (c *Caches) Merge(m Caches) {
c.Chunk.Merge(m.Chunk)
c.Index.Merge(m.Index)
c.Result.Merge(m.Result)
}

func (c *Cache) Merge(m Cache) {
c.EntriesFound += m.EntriesFound
c.EntriesRequested += m.EntriesRequested
c.EntriesStored += m.EntriesStored
c.Requests += m.Requests
c.BytesTransferred += m.BytesTransferred
}

// Merge merges two results of statistics.
// This will increase the total number of Subqueries.
func (r *Result) Merge(m Result) {
r.Summary.Subqueries++
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.Caches.Merge(m.Caches)
r.ComputeSummary(ConvertSecondsToNanoseconds(r.Summary.ExecTime+m.Summary.ExecTime),
ConvertSecondsToNanoseconds(r.Summary.QueueTime+m.Summary.QueueTime), int(r.Summary.TotalEntriesReturned))
}
Expand Down Expand Up @@ -257,6 +293,66 @@ func (c *Context) AddChunksRef(i int64) {
atomic.AddInt64(&c.store.TotalChunksRef, i)
}

func (c *Context) AddCacheEntriesFound(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.EntriesFound, int32(i))
}

func (c *Context) AddCacheEntriesRequested(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.EntriesRequested, int32(i))
}

func (c *Context) AddCacheEntriesStored(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.EntriesStored, int32(i))
}

func (c *Context) AddCacheBytesTransferred(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt64(&stats.BytesTransferred, int64(i))
}

func (c *Context) AddCacheRequest(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.Requests, int32(i))
}

func (c *Context) getCacheStatsByType(t CacheType) *Cache {
var stats *Cache
switch t {
case ChunkCache:
stats = &c.caches.Chunk
case IndexCache:
stats = &c.caches.Index
case ResultCache:
stats = &c.caches.Result
default:
return nil
}
return stats
}

// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
Expand Down Expand Up @@ -284,6 +380,7 @@ func (r Result) Log(log log.Logger) {
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
)
r.Caches.Log(log)
r.Summary.Log(log)
}

Expand All @@ -297,3 +394,20 @@ func (s Summary) Log(log log.Logger) {
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
}

func (c Caches) Log(log log.Logger) {
_ = log.Log(
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
"Cache.Chunk.BytesTransferred", humanize.Bytes(uint64(c.Chunk.BytesTransferred)),
"Cache.Index.Requests", c.Index.Requests,
"Cache.Index.EntriesRequested", c.Index.EntriesRequested,
"Cache.Index.EntriesFound", c.Index.EntriesFound,
"Cache.Index.BytesTransferred", humanize.Bytes(uint64(c.Index.BytesTransferred)),
"Cache.Result.Requests", c.Result.Requests,
"Cache.Result.EntriesRequested", c.Result.EntriesRequested,
"Cache.Result.EntriesFound", c.Result.EntriesFound,
"Cache.Result.BytesTransferred", humanize.Bytes(uint64(c.Result.BytesTransferred)),
)
}
64 changes: 64 additions & 0 deletions pkg/logqlmodel/stats/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func TestResult(t *testing.T) {
stats.AddChunksRef(50)
stats.AddChunksDownloaded(60)
stats.AddChunksDownloadTime(time.Second)
stats.AddCacheRequest(ChunkCache, 3)
stats.AddCacheRequest(IndexCache, 4)
stats.AddCacheRequest(ResultCache, 1)

fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)
Expand Down Expand Up @@ -60,6 +63,17 @@ func TestResult(t *testing.T) {
},
},
},
Caches: Caches{
Chunk: Cache{
Requests: 3,
},
Index: Cache{
Requests: 4,
},
Result: Cache{
Requests: 1,
},
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Seconds(),
Expand Down Expand Up @@ -182,6 +196,19 @@ func TestResult_Merge(t *testing.T) {
},
},
},
Caches: Caches{
Chunk: Cache{
Requests: 5,
BytesTransferred: 1024,
},
Index: Cache{
EntriesRequested: 22,
EntriesFound: 2,
},
Result: Cache{
EntriesStored: 3,
},
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Seconds(),
Expand Down Expand Up @@ -230,6 +257,19 @@ func TestResult_Merge(t *testing.T) {
},
},
},
Caches: Caches{
Chunk: Cache{
Requests: 2 * 5,
BytesTransferred: 2 * 1024,
},
Index: Cache{
EntriesRequested: 2 * 22,
EntriesFound: 2 * 2,
},
Result: Cache{
EntriesStored: 2 * 3,
},
},
Summary: Summary{
ExecTime: 2 * 2 * time.Second.Seconds(),
QueueTime: 2 * 2 * time.Nanosecond.Seconds(),
Expand Down Expand Up @@ -273,3 +313,27 @@ func TestIngester(t *testing.T) {
},
}, statsCtx.Ingester())
}

func TestCaches(t *testing.T) {
statsCtx, _ := NewContext(context.Background())

statsCtx.AddCacheRequest(ChunkCache, 5)
statsCtx.AddCacheEntriesStored(ResultCache, 3)
statsCtx.AddCacheEntriesRequested(IndexCache, 22)
statsCtx.AddCacheBytesTransferred(ChunkCache, 1024)
statsCtx.AddCacheEntriesFound(IndexCache, 2)

require.Equal(t, Caches{
Chunk: Cache{
Requests: 5,
BytesTransferred: 1024,
},
Index: Cache{
EntriesRequested: 22,
EntriesFound: 2,
},
Result: Cache{
EntriesStored: 3,
},
}, statsCtx.Caches())
}
Loading