Skip to content

Commit

Permalink
feat: Introduce shardable probabilistic topk for instant queries. (#1…
Browse files Browse the repository at this point in the history
…4243)

Signed-off-by: Callum Styan <[email protected]>
Co-authored-by: Callum Styan <[email protected]>
  • Loading branch information
jeschkies and cstyan authored Nov 4, 2024
1 parent 1c993f9 commit 7b53f20
Show file tree
Hide file tree
Showing 46 changed files with 3,153 additions and 961 deletions.
13 changes: 13 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ querier_rf1:
# CLI flag: -querier-rf1.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of labels the heap of a topk query using a count min
# sketch can track.
# CLI flag: -querier-rf1.engine.max-count-min-sketch-heap-size
[max_count_min_sketch_heap_size: <int> | default = 10000]

# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier-rf1.max-concurrent
Expand Down Expand Up @@ -3841,6 +3846,9 @@ otlp_config:
# CLI flag: -limits.ingestion-partition-tenant-shard-size
[ingestion_partitions_tenant_shard_size: <int> | default = 0]

# List of LogQL vector and range aggregations that should be sharded.
[shard_aggregations: <list of strings>]

# Enable metric aggregation. When enabled, pushed streams will be sampled for
# bytes and count, and these metric will be written back into Loki as a special
# __aggregated_metric__ stream, which can be queried for faster histogram
Expand Down Expand Up @@ -4245,6 +4253,11 @@ engine:
# CLI flag: -querier.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]
# The maximum number of labels the heap of a topk query using a count min
# sketch can track.
# CLI flag: -querier.engine.max-count-min-sketch-heap-size
[max_count_min_sketch_heap_size: <int> | default = 10000]
# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier.max-concurrent
Expand Down
111 changes: 110 additions & 1 deletion integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
"-querier.shard-aggregations=quantile_over_time",
"-querier.shard-aggregations=quantile_over_time,approx_topk",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
Expand Down Expand Up @@ -784,6 +784,115 @@ func TestOTLPLogsIngestQuery(t *testing.T) {
})
}

func TestProbabilisticQuery(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()

// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-compactor.compaction-interval=1s",
"-compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())

// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

// the run querier.
var (
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
"-querier.shard-aggregations=quantile_over_time,approx_topk",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLine("lineA", now.Add(-45*time.Minute), nil, map[string]string{"job": "one"}))
require.NoError(t, cliDistributor.PushLogLine("lineB", now.Add(-45*time.Minute), nil, map[string]string{"job": "one"}))

require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "one"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", now, nil, map[string]string{"job": "two"}))
})

t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunQuery(context.Background(), `approx_topk(1, count_over_time({job=~".+"}[1h]))`)
require.NoError(t, err)
assert.Equal(t, "vector", resp.Data.ResultType)

var values []string
var labels []string
for _, value := range resp.Data.Vector {
values = append(values, value.Value)
labels = append(labels, value.Metric["job"])
}
assert.ElementsMatch(t, []string{"3"}, values)
assert.ElementsMatch(t, []string{"one"}, labels)
})
}

func TestCategorizedLabels(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
Expand Down
Loading

0 comments on commit 7b53f20

Please sign in to comment.