Skip to content

Commit

Permalink
chore: [k200] fix(blooms): Fully deduplicate chunks from FilterChunkR…
Browse files Browse the repository at this point in the history
…ef responses (#12835)

Backport a0f358f from #12807

Co-authored-by: Christian Haudum <[email protected]>
  • Loading branch information
grafanabot and chaudum committed May 3, 2024
1 parent 22a8825 commit 8785e74
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 82 deletions.
9 changes: 8 additions & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ package bloomgateway
import (
"context"
"fmt"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -383,6 +384,10 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
case <-ctx.Done():
// do nothing
default:
// chunks may not always be sorted
if !slices.IsSortedFunc(res.Removals, func(a, b v1.ChunkRef) int { return a.Cmp(b) }) {
slices.SortFunc(res.Removals, func(a, b v1.ChunkRef) int { return a.Cmp(b) })
}
task.responses = append(task.responses, res)
}
}
Expand Down Expand Up @@ -413,13 +418,14 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r)))
}
return v1.NewHeapIterator[v1.Output](
func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp },
func(o1, o2 v1.Output) bool { return o1.Fp < o2.Fp },
itrs...,
)
}

// TODO(owen-d): improve perf. This can be faster with a more specialized impl
// NB(owen-d): `req` is mutated in place for performance, but `responses` is not
// Removals of the outputs must be sorted.
func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs {
res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))

Expand All @@ -433,6 +439,7 @@ func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Outpu
// from
v1.Identity[v1.Output],
// merge two removal sets for the same series, deduping
// requires that the removals of the outputs are sorted
func(o1, o2 v1.Output) v1.Output {
res := v1.Output{Fp: o1.Fp}

Expand Down
25 changes: 23 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,12 @@ func TestFilterChunkRefs(t *testing.T) {
{
{fp: 0, checksums: []uint32{0, 1}},
{fp: 0, checksums: []uint32{0, 1, 2}},
{fp: 1, checksums: []uint32{1}},
{fp: 1, checksums: []uint32{0, 2}},
{fp: 2, checksums: []uint32{1}},
},
},
expected: mkResult([]instruction{
{fp: 1, checksums: []uint32{0, 2}},
{fp: 1, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
Expand All @@ -670,6 +670,27 @@ func TestFilterChunkRefs(t *testing.T) {
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
},
{
desc: "unordered fingerprints",
input: mkInput(4, 3),
removals: [][]instruction{
{
{fp: 3, checksums: []uint32{2}},
{fp: 0, checksums: []uint32{1, 2}},
{fp: 2, checksums: []uint32{1, 2}},
},
{
{fp: 1, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{0, 1}},
{fp: 3, checksums: []uint32{0}},
},
},
expected: mkResult([]instruction{
{fp: 0, checksums: []uint32{0}},
{fp: 1, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{1}},
}),
},
} {
t.Run(tc.desc, func(t *testing.T) {
res := filterChunkRefs(tc.input, mkRemovals(tc.removals))
Expand Down
5 changes: 4 additions & 1 deletion pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,10 @@ func TestMerge(t *testing.T) {
m := newMerger()
actual, err := m.MergeResponse(input...)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)

resp, ok := actual.(*logproto.FilterChunkRefResponse)
require.True(t, ok)
require.Equal(t, tc.expected, resp)
})
}
}
Expand Down
80 changes: 36 additions & 44 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh

iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input))
for _, inp := range input {
sort.Slice(inp, func(i, j int) bool { return inp[i].Fingerprint < inp[j].Fingerprint })
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
func(a, b *logproto.GroupedChunkRefs) bool {
return a.Fingerprint < b.Fingerprint
},
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint < b.Fingerprint },
iters...,
)

Expand All @@ -311,10 +310,17 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
v1.Identity[*logproto.GroupedChunkRefs],
// merge
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
// TODO(chaudum): Check if we can assume sorted shortrefs here
if !slices.IsSortedFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) {
slices.SortFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
}
if !slices.IsSortedFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) {
slices.SortFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
}
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Tenant: a.Tenant,
Refs: mergeChunks(a.Refs, b.Refs),
Refs: mergeChunkSets(a.Refs, b.Refs),
}
},
// iterator
Expand All @@ -324,51 +330,37 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
return v1.CollectInto(dedupeIter, buf)
}

func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
if len(inputs) == 0 {
return nil
}
// mergeChunkSets merges and deduplicates two sorted slices of shortRefs
func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) {
var i, j int
for i < len(s1) && j < len(s2) {
a, b := s1[i], s2[j]

if a.Equal(b) {
result = append(result, a)
i++
j++
continue
}

if a.Less(b) {
result = append(result, a)
i++
continue
}

if len(inputs) == 1 {
slices.SortFunc(
inputs[0],
func(a, b *logproto.ShortRef) int {
if a.Equal(b) {
return 0
}
if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) {
return -1
}
return 1
},
)
return inputs[0]
result = append(result, b)
j++
}

iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs))
for _, inp := range inputs {
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
if i < len(s1) {
result = append(result, s1[i:]...)
}
if j < len(s2) {
result = append(result, s2[j:]...)
}

chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef](
// eq
func(a, b *logproto.ShortRef) bool { return a.Equal(b) },
// from
v1.Identity[*logproto.ShortRef],
// merge
func(a, b *logproto.ShortRef) *logproto.ShortRef { return a },
// iterator
v1.NewPeekingIter[*logproto.ShortRef](
v1.NewHeapIterator[*logproto.ShortRef](
func(a, b *logproto.ShortRef) bool {
return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through))
},
iters...,
),
),
)
merged, _ := v1.Collect(chunkDedupe)
return merged
return result
}

// doForAddrs sequetially calls the provided callback function fn for each
Expand Down
24 changes: 24 additions & 0 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,27 @@ func TestGatewayClient_MergeSeries(t *testing.T) {
result, _ := mergeSeries(inputs, nil)
require.Equal(t, expected, result)
}

func TestGatewayClient_MergeChunkSets(t *testing.T) {
inp1 := []*logproto.ShortRef{
shortRef(1, 3, 1),
shortRef(2, 3, 2),
shortRef(4, 5, 3),
}
inp2 := []*logproto.ShortRef{
shortRef(2, 3, 2),
shortRef(3, 4, 4),
shortRef(5, 6, 5),
}

expected := []*logproto.ShortRef{
shortRef(1, 3, 1),
shortRef(2, 3, 2),
shortRef(3, 4, 4),
shortRef(4, 5, 3),
shortRef(5, 6, 5),
}

result := mergeChunkSets(inp1, inp2)
require.Equal(t, expected, result)
}
64 changes: 31 additions & 33 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -99,9 +100,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)

result := make([]*logproto.ChunkRef, 0, len(chunkRefs))
seriesSeen := make(map[uint64]struct{}, len(grouped))

responses := make([][]*logproto.GroupedChunkRefs, 0, 2)
// We can perform requests sequentially, because most of the time the request
// only covers a single day, and if not, it's at most two days.
for _, s := range partitionSeriesByDay(from, through, grouped) {
Expand All @@ -110,53 +109,52 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
if err != nil {
return nil, err
}
var chunks int
for i := range s.series {
chunks += len(s.series[i].Refs)
}
sp.LogKV(
"day", s.day.Time.Time(),
"from", s.interval.Start.Time(),
"through", s.interval.End.Time(),
"series", len(s.series),
"chunks", chunks,
"blocks", len(blocks),
"skipped", len(skipped),
)

refs, err := bq.c.FilterChunks(ctx, tenant, s.interval, blocks, queryPlan)
if err != nil {
return nil, err
}

// add chunk refs from series that were not mapped to any blocks
refs = append(refs, skipped...)
responses = append(responses, refs, skipped)
bq.metrics.seriesSkipped.Add(float64(len(skipped)))
}

deduped, err := mergeSeries(responses, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to dedupe results")
}

for i := range refs {
seriesSeen[refs[i].Fingerprint] = struct{}{}
for _, ref := range refs[i].Refs {
result = append(result, &logproto.ChunkRef{
Fingerprint: refs[i].Fingerprint,
UserID: tenant,
From: ref.From,
Through: ref.Through,
Checksum: ref.Checksum,
})
}
result := make([]*logproto.ChunkRef, 0, len(chunkRefs))
for i := range deduped {
for _, ref := range deduped[i].Refs {
result = append(result, &logproto.ChunkRef{
Fingerprint: deduped[i].Fingerprint,
UserID: tenant,
From: ref.From,
Through: ref.Through,
Checksum: ref.Checksum,
})
}
}

postFilterChunks := len(result)
postFilterSeries := len(deduped)

level.Debug(bq.logger).Log(
"operation", "bloomquerier.FilterChunkRefs",
"tenant", tenant,
"from", from.Time(),
"through", through.Time(),
"responses", len(responses),
"preFilterChunks", preFilterChunks,
"postFilterChunks", len(result),
"postFilterChunks", postFilterChunks,
"filteredChunks", preFilterChunks-postFilterChunks,
"preFilterSeries", preFilterSeries,
"postFilterSeries", len(seriesSeen),
"postFilterSeries", postFilterSeries,
"filteredSeries", preFilterSeries-postFilterSeries,
)

postFilterChunks := len(result)
postFilterSeries := len(seriesSeen)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks))
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
Expand Down
Loading

0 comments on commit 8785e74

Please sign in to comment.