Skip to content

Commit

Permalink
fix(blooms): Correctly return unfiltered chunks for series that are n…
Browse files Browse the repository at this point in the history
…ot mapped to any block (#12774)

This PR fixes a conceptual mistake in the code of resolving blocks on the index gateways. Currently, a series does not resolve to any block is discarded instead of being kept for the response.
This change adds the chunks of the skipped series to the bloom querier response.


Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Apr 24, 2024
1 parent 282e385 commit c36b114
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 16 deletions.
5 changes: 4 additions & 1 deletion pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// only covers a single day, and if not, it's at most two days.
for _, s := range partitionSeriesByDay(from, through, grouped) {
day := bloomshipper.NewInterval(s.day.Time, s.day.Time.Add(Day))
blocks, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
if err != nil {
return nil, err
}
Expand All @@ -121,6 +121,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
return nil, err
}

// add chunk refs from series that were not mapped to any blocks
refs = append(refs, skipped...)

for i := range refs {
seriesSeen[refs[i].Fingerprint] = struct{}{}
for _, ref := range refs[i].Refs {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In
type mockBlockResolver struct{}

// Resolve implements BlockResolver.
func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) {
func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) {
day := truncateDay(interval.Start)
first, last := getFirstLast(series)
block := bloomshipper.BlockRef{
Expand All @@ -53,7 +53,7 @@ func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval blo
Checksum: 0,
},
}
return []blockWithSeries{{block: block, series: series}}, nil
return []blockWithSeries{{block: block, series: series}}, nil, nil
}

var _ BlockResolver = &mockBlockResolver{}
Expand Down
35 changes: 31 additions & 4 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type BlockResolver interface {
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, error)
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error)
}

type blockWithSeries struct {
Expand All @@ -28,7 +28,7 @@ type defaultBlockResolver struct {
logger log.Logger
}

func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) {
func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) {
minFp, maxFp := getFirstLast(series)
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Expand All @@ -52,10 +52,12 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter
)

if err != nil {
return nil, err
return nil, series, err
}

return blocksMatchingSeries(metas, interval, series), nil
mapped := blocksMatchingSeries(metas, interval, series)
skipped := unassignedSeries(mapped, series)
return mapped, skipped, nil
}

func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries {
Expand Down Expand Up @@ -96,6 +98,31 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter
return result
}

func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
skipped := make([]*logproto.GroupedChunkRefs, len(series))
_ = copy(skipped, series)

for _, block := range mapped {
minFp, maxFp := getFirstLast(block.series)

minIdx := sort.Search(len(skipped), func(i int) bool {
return skipped[i].Fingerprint >= minFp.Fingerprint
})

maxIdx := sort.Search(len(skipped), func(i int) bool {
return skipped[i].Fingerprint >= maxFp.Fingerprint
})

if minIdx == len(skipped) || maxIdx == 0 || minIdx == maxIdx {
continue
}

skipped = append(skipped[0:minIdx], skipped[maxIdx+1:]...)
}

return skipped
}

func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver {
return &defaultBlockResolver{
store: store,
Expand Down
156 changes: 147 additions & 9 deletions pkg/bloomgateway/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef {
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: "tenant",
TableName: "table",
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: from,
EndTimestamp: through,
},
}
}

func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.Meta {
return bloomshipper.Meta{
Blocks: []bloomshipper.BlockRef{
{
Ref: bloomshipper.Ref{
TenantID: "tenant",
TableName: "table",
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: from,
EndTimestamp: through,
},
},
makeBlockRef(minFp, maxFp, from, through),
},
}
}
Expand Down Expand Up @@ -113,3 +117,137 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) {
require.Equal(t, expected, res)
})
}

func TestBlockResolver_UnassignedSeries(t *testing.T) {
series := []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
}

testCases := []struct {
desc string
mapped []blockWithSeries
expected []*logproto.GroupedChunkRefs
}{
{
desc: "no blocks - all unassigned",
mapped: []blockWithSeries{},
expected: series,
},
{
desc: "block has no overlapping series - all unassigned",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0xf0},
{Fingerprint: 0xff},
},
},
},
expected: series,
},
{
desc: "single block covering all series - no unassigned",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
},
},
},
expected: []*logproto.GroupedChunkRefs{},
},
{
desc: "multiple blocks covering all series - no unassigned",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x40},
{Fingerprint: 0x60},
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
},
},
},
expected: []*logproto.GroupedChunkRefs{},
},
{
desc: "single block overlapping some series",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
},
},
},
expected: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
},
},
{
desc: "multiple blocks overlapping some series",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
},
},
},
expected: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0xe0},
},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
result := unassignedSeries(tc.mapped, series)
require.Equal(t, result, tc.expected)
})
}
}

0 comments on commit c36b114

Please sign in to comment.