Skip to content

Commit

Permalink
docs: Documentation for Query Acceleration with Blooms (#12486)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: J Stickler <[email protected]>
  • Loading branch information
3 people authored Apr 5, 2024
1 parent 3634ff4 commit 23b5d03
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 10 deletions.
48 changes: 38 additions & 10 deletions docs/sources/get-started/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ Loki is a modular system that contains many components that can either be run to
in logical groups (in "simple scalable deployment" mode with targets `read`, `write`, `backend`), or individually (in "microservice" mode).
For more information see [Deployment modes]({{< relref "./deployment-modes" >}}).

| Component | _individual_ | `all` | `read` | `write` | `backend` |
| ----------------------------------- | - | - | - | - | - |
| [Distributor](#distributor) | x | x | | x | |
| [Ingester](#ingester) | x | x | | x | |
| [Query Frontend](#query-frontend) | x | x | x | | |
| [Query Scheduler](#query-scheduler) | x | x | | | x |
| [Querier](#querier) | x | x | x | | |
| [Index Gateway](#index-gateway) | x | | | | x |
| [Compactor](#compactor) | x | x | | | x |
| [Ruler](#ruler) | x | x | | | x |
| Component | _individual_ | `all` | `read` | `write` | `backend` |
|----------------------------------------------------|--------------| - | - | - | - |
| [Distributor](#distributor) | x | x | | x | |
| [Ingester](#ingester) | x | x | | x | |
| [Query Frontend](#query-frontend) | x | x | x | | |
| [Query Scheduler](#query-scheduler) | x | x | | | x |
| [Querier](#querier) | x | x | x | | |
| [Index Gateway](#index-gateway) | x | | | | x |
| [Compactor](#compactor) | x | x | | | x |
| [Ruler](#ruler) | x | x | | | x |
| [Bloom Compactor (Experimental)](#bloom-compactor) | x | | | | x |
| [Bloom Gateway (Experimental)](#bloom-gateway) | x | | | | x |

This page describes the responsibilities of each of these components.

Expand Down Expand Up @@ -334,3 +336,29 @@ This mode is called remote rule evaluation and is used to gain the advantages of
from the query frontend.

When running multiple rulers, they use a consistent hash ring to distribute rule groups amongst available ruler instances.

## Bloom Compactor
{{% admonition type="warning" %}}
This feature is an [experimental feature](/docs/release-life-cycle/). Engineering and on-call support is not available. No SLA is provided.
{{% /admonition %}}

The Bloom Compactor service is responsible for building blooms for chunks in the object store.
The resulting blooms are grouped in bloom blocks spanning multiple series and chunks from a given day.
This component also builds metadata files to track which blocks are available for each series and TSDB index file.

The service is horizontally scalable. When running multiple Bloom Compactors, they use a ring to shard tenants and
distribute series fingerprints among the available Bloom Compactor instances.
The ring is also used to decide which compactor should apply blooms retention.

## Bloom Gateway
{{% admonition type="warning" %}}
This feature is an [experimental feature](/docs/release-life-cycle/). Engineering and on-call support is not available. No SLA is provided.
{{% /admonition %}}

The Bloom Gateway service is responsible for handling and serving chunks filtering requests.
The index gateway queries the Bloom Gateway when computing chunk references, or when computing shards for a given query.
The gateway service takes a list of chunks and a filtering expression and matches them against the blooms,
filtering out any chunks that do not match the given filter expression.

The service is horizontally scalable. When running multiple instances, they use a ring to shard tenants and
distribute series fingerprints across instances.
12 changes: 12 additions & 0 deletions docs/sources/get-started/hash-rings.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ These components need to be connected into a hash ring:
- query schedulers
- compactors
- rulers
- bloom compactors (Experimental)

These components can optionally be connected into a hash ring:
- index gateway


In an architecture that has three distributors and three ingesters defined,
the hash rings for these components connect the instances of same-type components.

Expand Down Expand Up @@ -102,3 +104,13 @@ The ruler ring is used to determine which rulers evaluate which rule groups.
## About the index gateway ring

The index gateway ring is used to determine which gateway is responsible for which tenant's indexes when queried by rulers or queriers.

## About the Bloom Compactor ring
{{% admonition type="warning" %}}
This feature is an [experimental feature](/docs/release-life-cycle/). Engineering and on-call support is not available. No SLA is provided.
{{% /admonition %}}

The Bloom Compactor ring is used to determine which subset of compactors own a given tenant,
and which series fingerprint ranges each compactor owns.
The ring is also used to determine which compactor owns retention.
Retention will be applied by the compactor owning the smallest token in the ring.
221 changes: 221 additions & 0 deletions docs/sources/operations/query-acceleration-blooms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
---
title: Query Acceleration with Blooms (Experimental)
menuTitle: Query Acceleration with Blooms
description: Describes how to enable and configure query acceleration with blooms.
weight:
keywords:
- blooms
- query acceleration
---

# Query Acceleration with Blooms (Experimental)
{{% admonition type="warning" %}}
This feature is an [experimental feature](/docs/release-life-cycle/). Engineering and on-call support is not available. No SLA is provided.
{{% /admonition %}}

Loki 3.0 leverages [bloom filters](https://en.wikipedia.org/wiki/Bloom_filter) to speed up queries by reducing the
amount of data Loki needs to load from the store and iterate through. Loki is often used to run “needle in a haystack”
queries; these are queries where a large number of log lines are searched, but only a few log lines match the [filtering
expressions]({{< relref "../query/log_queries#line-filter-expression" >}}) of the query.
Some common use cases are needing to find a specific text pattern in a message, or all logs tied to a specific customer ID.

An example of such queries would be looking for a trace ID on a whole cluster for the past 24 hours:

```logql
{cluster="prod"} |= "traceID=3c0e3dcd33e7"
```

Loki would download all the chunks for all the streams matching `{cluster=”prod”}` for the last 24 hours and iterate
through each log line in the chunks checking if the string `traceID=3c0e3dcd33e7` is present.

With accelerated filtering, Loki is able to skip most of the chunks and only process the ones where we have a
statistical confidence that the string might be present.
The underlying blooms are built by the new [Bloom Compactor](#bloom-compactor) component
and served by the new [Bloom Gateway](#bloom-gateway) component.

## Enable Query Acceleration with Blooms
To start building and using blooms you need to:
- Deploy the [Bloom Compactor](#bloom-compactor) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Compactor config][compactor-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][gateway-cfg].
- Enable blooms filtering and compaction for each tenant individually, or for all of them by default.

```yaml
bloom_compactor:
enabled: true

bloom_gateway:
enabled: true
client:
addresses: dnssrvnoa+_bloom-gateway-grpc._tcp.bloom-gateway-headless.<namespace>.svc.cluster.local

# Enable blooms filtering and compaction for all tenants by default
limits_config:
bloom_gateway_enable_filtering: true
bloom_compactor_enable_compaction: true
```
For more configuration options refer to the [Bloom Gateways][gateway-cfg], [Bloom Compactor][compactor-cfg] and
[per tenant-limits][tenant-limits] configuration docs.
We strongly recommend reading the whole documentation for this experimental feature before using it.
## Bloom Compactor
The Bloom Compactor component builds blooms from the chunks in the object store.
The resulting blooms are grouped in bloom blocks spanning multiple streams (also known as series) and chunks from a given day.
To learn more about how blocks and metadata files are organized, refer to the
[Building and querying blooms](#building-and-querying-blooms) section below.
Bloom Compactors are horizontally scalable and use a [ring] for sharding tenants and stream fingerprints,
as well as determining which compactor should apply [blooms retention](#retention).
Each compactor owns a configurable number of contiguous streams fingerprint ranges for a tenant.
The compactor builds blooms for all the chunks from the tenant streams whose fingerprint
falls within its owned key-space ranges.
You can find all the configuration options for this component in the [Configure section for the Bloom Compactor][compactor-cfg].
Refer to the [Enable Query Acceleration with Blooms](#enable-query-acceleration-with-blooms) section below for
a configuration snippet enabling this feature.
### Retention
One Bloom Compactor from all those running will apply retention. Retention is disabled by default.
The instance owning the smallest token in the ring owns retention.
Retention is applied to all tenants. The retention for each tenant is the longest of its [configured][tenant-limits]
general retention (`retention_period`) and the streams retention (`retention_stream`).

For example, in the following example, tenant A has a bloom retention of 30 days,
and tenant B a bloom retention of 40 days.

```yaml
overrides:
"A":
retention: 30d
"B":
retention: 30d
retention_stream:
- selector: '{namespace="prod"}'
priority: 1
period: 40d
```

### Sizing
Compactors build blocks concurrently. Concurrency is [configured][compactor-cfg] via `-bloom-compactor.worker-parallelism`.
Each worker will build bloom blocks from streams and chunks.
The maximum block size is configured per tenant via `-bloom-compactor.max-block-size`.
Note that the actual block size might exceed this limit given that we append streams blooms to the block until the
block is larger than the configured maximum size. Blocks are created in memory and as soon as they are written to the
object store they are freed. Chunks and TSDB files are downloaded from the object store to the file system.
We estimate that compactors are able to process 4 MB worth of data per second per core.

## Bloom Gateway
Bloom Gateways handle chunks filtering requests from the [index gateway]({{< relref "../get-started/components#index-gateway" >}}).
The service takes a list of chunks and a filtering expression and matches them against the blooms,
filtering out those chunks not matching the given filter expression.

This component is horizontally scalable and every instance only owns a subset of the stream
fingerprint range for which it performs the filtering.
The sharding of the data is performed on the client side using DNS discovery of the server instances
and the [jumphash](https://arxiv.org/abs/1406.2294) algorithm for consistent hashing
and even distribution of the stream fingerprints across Bloom Gateway instances.

You can find all the configuration options for this component in the Configure section for the [Bloom Gateways][gateway-cfg].
Refer to the [Enable Query Acceleration with Blooms](#enable-query-acceleration-with-blooms) section below for a configuration snippet enabling this feature.

### Sizing
Bloom Gateways use their local filesystem as a Least Recently Used (LRU) cache for blooms that are
downloaded from object storage. The size of the blooms depend on the ingest volume and the log content cardinality,
as well as on compaction settings of the blooms, namely n-gram length, skip-factor, and false-positive-rate.
With default settings, bloom filters make up roughly 3% of the chunk data.

Example calculation for storage requirements of blooms for a single tenant.
```
100 MB/s ingest rate ~> 8.6 TB/day chunks ~> 260 GB/day blooms
```
Since reading blooms depends heavily on disk IOPS, Bloom Gateways should make use of multiple,
locally attached SSD disks (NVMe) to increase i/o throughput.
Multiple directories on different disk mounts can be specified using the `-bloom.shipper.working-directory` [setting][gateway-cfg]
when using a comma separated list of mount points, for example:
```
-bloom.shipper.working-directory="/mnt/data0,/mnt/data1,/mnt/data2,/mnt/data3"
```
Bloom Gateways need to deal with relatively large files: the bloom filter blocks.
Even though the binary format of the bloom blocks allows for reading them into memory in smaller pages,
the memory consumption depends on the amount of pages that are concurrently loaded into memory for processing.
The product of three settings control the maximum amount of bloom data in memory at any given
time: `-bloom-gateway.worker-concurrency`, `-bloom-gateway.block-query-concurrency`, and `-bloom.max-query-page-size`.
Example, assuming 4 CPU cores:
```
-bloom-gateway.worker-concurrency=4 // 1x NUM_CORES
-bloom-gateway.block-query-concurrency=8 // 2x NUM_CORES
-bloom.max-query-page-size=64MiB

4 x 8 x 64MiB = 2048MiB
```
Here, the memory requirement for block processing is 2GiB.
To get the minimum requirements for the Bloom Gateways, you need to double the value.
## Building and querying blooms
Bloom filters are built per stream and aggregated together into block files.
Streams are assigned to blocks by their fingerprint, following the same ordering scheme as Loki’s TSDB and sharding calculation.
This gives a data locality benefit when querying as streams in the same shard are likely to be in the same block.
In addition to blocks, compactors maintain a list of metadata files containing references to bloom blocks and the
TSDB index files they were built from. They also contain tombstones for old blocks which are outdated and
can be deleted in future iterations. Gateways and compactors use these metadata files to discover existing blocks.
Every `-bloom-compactor.compaction-interval`, compactors will load the latest TSDB files for all tenants for
which bloom compaction is enabled, and compare the TSDB files with the latest bloom metadata files.
If there are new TSDB files or any of them have changed, the compactor will process all the streams and chunks pointed
by the TSDB file. In case of changes for a previously processed TSDB file,
compactors will try to reuse blooms from existing blocks instead of building new ones from scratch.
For a given stream, the compactor owning that stream will iterate through all the log lines inside its new
chunks and build a bloom for the stream. For each log line, we compute its [n-grams](https://en.wikipedia.org/wiki/N-gram#:~:text=An%20n%2Dgram%20is%20a,pairs%20extracted%20from%20a%20genome.)
and append to the bloom both the hash for each n-gram and the hash for each n-gram plus the chunk identifier.
The former allows gateways to skip whole streams while the latter is for skipping individual chunks.
For example, given a log line `abcdef` in the chunk `c6dj8g`, we compute its n-grams: `abc`, `bcd`, `cde`, `def`.
And append to the stream bloom the following hashes: `hash("abc")`, `hash("abc" + "c6dj8g")` ... `hash("def")`, `hash("def" + "c6dj8g")`.
By adding n-grams to blooms instead of whole log lines, we can perform partial matches.
For the example above, a filter expression `|= "bcd"` would match against the bloom.
The filter `|= "bcde` would also match the bloom since we decompose the filter into n-grams:
`bcd`, `cde` which both are present in the bloom.
N-grams sizes are configurable. The longer the n-gram is, the fewer tokens we need to append to the blooms,
but the longer filtering expressions need to be able to check them against blooms.
For the example above, where the n-gram length is 3, we need filtering expressions that have at least 3 characters.
### Queries for which blooms are used
Loki will check blooms for any log filtering expression within a query that satisfies the following criteria:
- The filtering expression contains at least as many characters as the n-gram length used to build the blooms.
- For example, if the n-grams length is 5, the filter `|= "foo"` will not take advantage of blooms but `|= "foobar"` would.
- If the filter is a regex, we use blooms only if we can simplify the regex to a set of simple matchers.
- For example, `|~ "(error|warn)"` would be simplified into `|= "error" or "warn"` thus would make use of blooms,
whereas `|~ "f.*oo"` would not be simplifiable.
- The filtering expression is a match (`|=`) or regex match (`|~`) filter. We don’t use blooms for not equal (`!=`) or not regex (`!~`) expressions.
- For example, `|= "level=error"` would use blooms but `!= "level=error"` would not.
- The filtering expression is placed before a [line format expression](https://grafana.com/docs/loki/latest/query/log_queries/#line-format-expression).
- For example, with `|= "level=error" | logfmt | line_format "ERROR {{.err}}" |= "traceID=3ksn8d4jj3"`,
the first filter (`|= "level=error"`) will benefit from blooms but the second one (`|= "traceID=3ksn8d4jj3"`) will not.
## Query sharding
Query acceleration does not just happen while processing chunks,
but also happens from the query planning phase where the query frontend applies [query sharding](https://lokidex.com/posts/tsdb/#sharding).
Loki 3.0 introduces a new {per-tenant configuration][tenant-limits] flag `tsdb_sharding_strategy` which defaults to computing
shards as in previous versions of Loki by using the index stats to come up with the closest power of two that would
optimistically divide the data to process in shards of roughly the same size. Unfortunately,
the amount of data each stream has is often unbalanced with the rest,
therefore, some shards end up processing more data than others.
Query acceleration introduces a new sharding strategy: `bounded`, which uses blooms to reduce the chunks to be
processed right away during the planning phase in the query frontend,
as well as evenly distributes the amount of chunks each sharded query will need to process.
[ring]: https://grafana.com/docs/loki/latest/get-started/hash-rings/
[tenant-limits]: https://grafana.com/docs/loki/latest/configure/#limits_config
[gateway-cfg]: https://grafana.com/docs/loki/latest/configure/#bloom_gateway
[compactor-cfg]: https://grafana.com/docs/loki/latest/configure/#bloom_compactor
[microservices]: https://grafana.com/docs/loki/latest/get-started/deployment-modes/#microservices-mode
[ssd]: https://grafana.com/docs/loki/latest/get-started/deployment-modes/#simple-scalable

0 comments on commit 23b5d03

Please sign in to comment.