Skip to content

Commit

Permalink
fix(blooms): Deduplicate filtered series and chunks (#12791)
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Apr 25, 2024
1 parent 807d692 commit 3bf2d1f
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 71 deletions.
55 changes: 8 additions & 47 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package bloomgateway
import (
"context"
"flag"
"sort"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -95,58 +93,21 @@ func newMerger() merger {
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
size += len(res.ChunkRefs)
}

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
unmerged := make([][]*logproto.GroupedChunkRefs, 0, len(responses))
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs = append(chunkRefs, res.ChunkRefs...)
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
}, nil
}

// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
if len(chunkRefs) <= 1 {
return chunkRefs
}

sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})

var lastDiffFP int
for i := 1; i < len(chunkRefs); i++ {
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
} else {
lastDiffFP++
chunkRefs[lastDiffFP] = chunkRefs[i]
}
unmerged = append(unmerged, res.ChunkRefs)
size += len(res.ChunkRefs)
}
return chunkRefs[:lastDiffFP+1]
}

// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
if len(refs) <= 1 {
return refs
buf := make([]*logproto.GroupedChunkRefs, 0, size)
deduped, err := mergeSeries(unmerged, buf)
if err != nil {
return nil, err
}

sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
return a.Checksum == b.Checksum
})
return &logproto.FilterChunkRefResponse{ChunkRefs: deduped}, nil
}

type ClientCache struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ func TestMerge(t *testing.T) {
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 1000,
Through: 1500,
Expand All @@ -303,11 +308,6 @@ func TestMerge(t *testing.T) {
Through: 2500,
Checksum: 30,
},
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Expand Down
106 changes: 88 additions & 18 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bloomgateway
import (
"context"
"flag"
"fmt"
"io"
"math"
"sort"
Expand All @@ -15,13 +14,15 @@ import (
ringclient "github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/queue"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -258,17 +259,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
})

level.Info(c.logger).Log(
"msg", "do FilterChunkRefs for addresses",
"part", fmt.Sprintf("%d/%d", i+1, len(servers)),
"addr", rs.addr,
"from", interval.Start.Time(),
"through", interval.End.Time(),
"series", len(rs.groups),
"blocks", len(rs.blocks),
"tenant", tenant,
)

return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: interval.Start,
Expand All @@ -290,15 +280,95 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva
if err != nil {
return nil, err
}
return flatten(results, count), nil

buf := make([]*logproto.GroupedChunkRefs, 0, count)
return mergeSeries(results, buf)
}

func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChunkRefs {
result := make([]*logproto.GroupedChunkRefs, 0, n)
for _, res := range input {
result = append(result, res...)
// mergeSeries combines respones from multiple FilterChunkRefs calls and deduplicates
// chunks from series that appear in multiple responses.
// To avoid allocations, an optional slice can be passed as second argument.
func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) {
// clear provided buffer
buf = buf[:0]

iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input))
for _, inp := range input {
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}
return result

heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
func(a, b *logproto.GroupedChunkRefs) bool {
return a.Fingerprint < b.Fingerprint
},
iters...,
)

dedupeIter := v1.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs](
// eq
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint },
// from
v1.Identity[*logproto.GroupedChunkRefs],
// merge
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Tenant: a.Tenant,
Refs: mergeChunks(a.Refs, b.Refs),
}
},
// iterator
v1.NewPeekingIter(heapIter),
)

return v1.CollectInto(dedupeIter, buf)
}

func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
if len(inputs) == 0 {
return nil
}

if len(inputs) == 1 {
slices.SortFunc(
inputs[0],
func(a, b *logproto.ShortRef) int {
if a.Equal(b) {
return 0
}
if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) {
return -1
}
return 1
},
)
return inputs[0]
}

iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs))
for _, inp := range inputs {
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef](
// eq
func(a, b *logproto.ShortRef) bool { return a.Equal(b) },
// from
v1.Identity[*logproto.ShortRef],
// merge
func(a, b *logproto.ShortRef) *logproto.ShortRef { return a },
// iterator
v1.NewPeekingIter[*logproto.ShortRef](
v1.NewHeapIterator[*logproto.ShortRef](
func(a, b *logproto.ShortRef) bool {
return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through))
},
iters...,
),
),
)
merged, _ := v1.Collect(chunkDedupe)
return merged
}

// doForAddrs sequetially calls the provided callback function fn for each
Expand Down
37 changes: 37 additions & 0 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand All @@ -33,3 +35,38 @@ func TestBloomGatewayClient(t *testing.T) {
require.Equal(t, 0, len(res))
})
}

func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {
return &logproto.ShortRef{
From: f,
Through: t,
Checksum: c,
}
}

func TestGatewayClient_MergeSeries(t *testing.T) {
inputs := [][]*logproto.GroupedChunkRefs{
// response 1
{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks
},
// response 2
{
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
},
}

expected := []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
}

result, _ := mergeSeries(inputs, nil)
require.Equal(t, expected, result)
}
2 changes: 1 addition & 1 deletion pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type BlockResolver interface {
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error)
Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error)
}

type blockWithSeries struct {
Expand Down

0 comments on commit 3bf2d1f

Please sign in to comment.