Skip to content

Commit

Permalink
feat(blooms): ingester aware bounded impl (#12840)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored May 1, 2024
1 parent a03846b commit 7bbd8b5
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 23 deletions.
3 changes: 3 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ func ParseIndexShardsQuery(r *http.Request) (*RangeQuery, datasize.ByteSize, err
return nil, 0, err
}
targetBytes, err := parseBytes(r, "targetBytesPerShard", true)
if targetBytes <= 0 {
return nil, 0, errors.New("targetBytesPerShard must be a positive value")
}
return parsed, targetBytes, err
}

Expand Down
105 changes: 105 additions & 0 deletions pkg/storage/async_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"fmt"
"time"

"github.com/c2h5oh/datasize"
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -281,3 +284,105 @@ func filterDuplicateChunks(scfg config.SchemaConfig, storeChunks [][]chunk.Chunk

return filteredChunkIDs
}

func (a *AsyncStore) GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
predicate chunk.Predicate,
) (*logproto.ShardsResponse, error) {
logger := log.With(
util_log.WithContext(ctx, util_log.Logger),
"component", "asyncStore",
)

if !a.shouldQueryIngesters(through, model.Now()) {
return a.Store.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate)
}

var (
shardResp *logproto.ShardsResponse
statsResp *stats.Stats
)

jobs := []func() error{
func() error {
var err error
shardResp, err = a.Store.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate)
return err
},
// We can't dedupe shards by their contents, so we complement the
// store's response with the ingester's stats and .
func() error {
var err error
statsResp, err = a.ingesterQuerier.Stats(ctx, userID, from, through, predicate.Matchers...)
return err
},
}

if err := concurrency.ForEachJob(
ctx,
len(jobs),
len(jobs),
func(ctx context.Context, i int) error {
return jobs[i]()
},
); err != nil {
return nil, err
}

return mergeShardsFromIngestersAndStore(logger, shardResp, statsResp, targetBytesPerShard), nil
}

func mergeShardsFromIngestersAndStore(
logger log.Logger,
storeResp *logproto.ShardsResponse,
statsResp *logproto.IndexStatsResponse,
targetBytesPerShard uint64,
) *logproto.ShardsResponse {
var storeBytes uint64
for _, shard := range storeResp.Shards {
storeBytes += shard.Stats.Bytes
}
totalBytes := storeBytes + statsResp.Bytes

defer func() {
level.Debug(logger).Log(
"msg", "resolved shards ",
"ingester_bytes", datasize.ByteSize(statsResp.Bytes).HumanReadable(),
"store_bytes", datasize.ByteSize(storeBytes).HumanReadable(),
"total_bytes", datasize.ByteSize(totalBytes).HumanReadable(),
"target_bytes", datasize.ByteSize(targetBytesPerShard).HumanReadable(),
"store_shards", len(storeResp.Shards),
)
}()

// edge case to avoid divide by zero later
if totalBytes == 0 {
return &logproto.ShardsResponse{
Shards: sharding.LinearShards(0, 0),
}
}

// If the ingesters don't have enough data to meaningfuly
// change the number of shards, use the store response.
if pct := float64(statsResp.Bytes) / float64(totalBytes); pct < 0.25 {
return storeResp
}

shards := sharding.LinearShards(int(totalBytes/targetBytesPerShard), totalBytes)

// increment the total chunks by the number seen from ingesters
// NB(owen-d): this isn't perfect as it mixes signals a bit by joining
// store chunks which _could_ possibly be filtered with ingester chunks which can't,
// but it's still directionally helpful
updatedStats := storeResp.Statistics
updatedStats.Index.TotalChunks += int64(statsResp.Chunks)
return &logproto.ShardsResponse{
Shards: shards,
Statistics: updatedStats,
// explicitly nil chunkgroups when we've changed the shards+included chunkrefs from ingesters
ChunkGroups: nil,
}
}
85 changes: 83 additions & 2 deletions pkg/storage/async_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"testing"
"time"

"github.com/go-kit/log"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -15,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/util"
)

Expand All @@ -29,8 +33,8 @@ func newStoreMock() *storeMock {
return &storeMock{}
}

func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
args := s.Called(ctx, userID, from, through, predicate, storeChunksOverride)
func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, predicate chunk.Predicate, overrides *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
args := s.Called(ctx, userID, from, through, predicate, overrides)
return args.Get(0).([][]chunk.Chunk), args.Get(1).([]*fetcher.Fetcher), args.Error(2)
}

Expand Down Expand Up @@ -360,3 +364,80 @@ func convertChunksToChunkIDs(s config.SchemaConfig, chunks []chunk.Chunk) []stri

return chunkIDs
}

func TestMergeShardsFromIngestersAndStore(t *testing.T) {
mkStats := func(bytes, chks uint64) logproto.IndexStatsResponse {
return logproto.IndexStatsResponse{
Bytes: bytes,
Chunks: chks,
}
}

// creates n shards with bytesPerShard * n bytes and chks chunks
mkShards := func(n int, bytesPerShard uint64, chks int64) logproto.ShardsResponse {
return logproto.ShardsResponse{
Shards: sharding.LinearShards(n, bytesPerShard*uint64(n)),
Statistics: stats.Result{
Index: stats.Index{
TotalChunks: chks,
},
},
}
}

targetBytesPerShard := 10

for _, tc := range []struct {
desc string
ingester logproto.IndexStatsResponse
store logproto.ShardsResponse
exp logproto.ShardsResponse
}{
{
desc: "zero bytes returns one full shard",
ingester: mkStats(0, 0),
store: mkShards(0, 0, 0),
exp: mkShards(1, 0, 0),
},
{
desc: "zero ingester bytes honors store",
ingester: mkStats(0, 0),
store: mkShards(10, uint64(targetBytesPerShard), 10),
exp: mkShards(10, uint64(targetBytesPerShard), 10),
},
{
desc: "zero store bytes honors ingester",
ingester: mkStats(uint64(targetBytesPerShard*10), 10),
store: mkShards(0, 0, 0),
exp: mkShards(10, uint64(targetBytesPerShard), 10),
},
{
desc: "ingester bytes below threshold ignored",
ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters
store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store
exp: mkShards(10, uint64(targetBytesPerShard), 10), // use the store's resp
},
{
desc: "ingester bytes above threshold recreate shards",
ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters
store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store
exp: mkShards(14, uint64(targetBytesPerShard), 20), // regenerate 14 shards
},
} {

t.Run(tc.desc, func(t *testing.T) {
got := mergeShardsFromIngestersAndStore(
log.NewNopLogger(),
&tc.store,
&tc.ingester,
uint64(targetBytesPerShard),
)
require.Equal(t, tc.exp.Statistics, got.Statistics)
require.Equal(t, tc.exp.ChunkGroups, got.ChunkGroups)
require.Equal(t, tc.exp.Statistics.Index.TotalChunks, got.Statistics.Index.TotalChunks)
for i, shard := range tc.exp.Shards {
require.Equal(t, shard, got.Shards[i], "shard %d", i)
}
})
}
}
27 changes: 6 additions & 21 deletions pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,7 @@ type PowerOfTwoSharding struct {

func (p PowerOfTwoSharding) ShardsFor(bytes uint64, maxBytesPerShard uint64) []logproto.Shard {
factor := GuessShardFactor(bytes, maxBytesPerShard, p.MaxShards)

if factor < 2 {
return []logproto.Shard{{
Bounds: logproto.FPBounds{
Min: 0,
Max: math.MaxUint64,
},
Stats: &stats.Stats{
Bytes: bytes,
},
}}
}

return LinearShards(factor, bytes)

}

// LinearShards is a sharding implementation that splits the data into
Expand Down Expand Up @@ -71,14 +57,13 @@ func LinearShards(n int, bytes uint64) []logproto.Shard {
Bytes: bytesPerShard,
},
}

// The last shard should have the remainder of the bytes
// and the max bound should be math.MaxUint64
// NB(owen-d): this can only happen when maxShards is used
// and the maxShards isn't a factor of 2
shards[len(shards)-1].Stats.Bytes += bytes % uint64(n)
shards[len(shards)-1].Bounds.Max = math.MaxUint64
}
// The last shard should have the remainder of the bytes
// and the max bound should be math.MaxUint64
// NB(owen-d): this can only happen when maxShards is used
// and the maxShards isn't a factor of 2
shards[len(shards)-1].Stats.Bytes += bytes % uint64(n)
shards[len(shards)-1].Bounds.Max = math.MaxUint64

return shards

Expand Down
4 changes: 4 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ func (l *Limits) Validate() error {
return err
}

if l.TSDBMaxBytesPerShard <= 0 {
return errors.New("querier.tsdb-max-bytes-per-shard must be greater than 0")
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func TestLimitsValidation(t *testing.T) {
desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding)
t.Run(desc, func(t *testing.T) {
tc.limits.TSDBShardingStrategy = logql.PowerOfTwoVersion.String() // hacky but needed for test
tc.limits.TSDBMaxBytesPerShard = DefaultTSDBMaxBytesPerShard
if tc.expected == nil {
require.NoError(t, tc.limits.Validate())
} else {
Expand Down

0 comments on commit 7bbd8b5

Please sign in to comment.