Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Fully deduplicate chunks from FilterChunkRef responses #12807

Merged
merged 9 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ package bloomgateway
import (
"context"
"fmt"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -383,8 +384,10 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
case <-ctx.Done():
// do nothing
default:
// chunks may not be sorted
sort.Slice(res.Removals, func(i, j int) bool { return res.Removals[i].Less(res.Removals[j]) })
// chunks may not always be sorted
if !slices.IsSortedFunc(res.Removals, func(a, b v1.ChunkRef) int { return a.Cmp(b) }) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we end up using the same cmp funtion all over the places. Would be nice to have another function next to func (r *ChunkRef) Cmp(other ChunkRef) int that does this:

func CmpChunkRefs(a, b ChunkRef) int { return a.cmp(b) }

So we can use IsSortedFunc and SortFunc as:

                        if !slices.IsSortedFunc(a.Refs, CmpChunkRefs) {
				slices.SortFunc(a.Refs, CmpChunkRefs)
			}
			if !slices.IsSortedFunc(b.Refs, CmpChunkRefs) {
				slices.SortFunc(b.Refs, CmpChunkRefs)
			}

slices.SortFunc(res.Removals, func(a, b v1.ChunkRef) int { return a.Cmp(b) })
}
task.responses = append(task.responses, res)
}
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,13 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
v1.Identity[*logproto.GroupedChunkRefs],
// merge
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
slices.SortFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
slices.SortFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
// TODO(chaudum): Check if we can assume sorted shortrefs here
if !slices.IsSortedFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) {
slices.SortFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
}
if !slices.IsSortedFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) {
slices.SortFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) })
}
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Tenant: a.Tenant,
Expand All @@ -329,7 +334,6 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) {
var i, j int
for i < len(s1) && j < len(s2) {

a, b := s1[i], s2[j]

if a.Equal(b) {
Expand All @@ -342,10 +346,11 @@ func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) {
if a.Less(b) {
result = append(result, a)
i++
} else {
result = append(result, b)
j++
continue
}

result = append(result, b)
j++
}

if i < len(s1) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,18 @@ func (r *ChunkRef) Less(other ChunkRef) bool {
return r.Checksum < other.Checksum
}

func (r *ChunkRef) Cmp(other ChunkRef) int {
if r.From != other.From {
return int(other.From) - int(r.From)
}

if r.Through != other.Through {
return int(other.Through) - int(r.Through)
}

return int(other.Checksum) - int(r.Checksum)
}

func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time {
// delta encode start time
enc.PutVarint64(int64(r.From - previousEnd))
Expand Down
67 changes: 66 additions & 1 deletion pkg/storage/bloom/v1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,72 @@ func TestSeriesEncoding(t *testing.T) {
require.Equal(t, src, dst)
}

func TestChunkRefCompare(t *testing.T) {
func TestChunkRefCmpLess(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
desc string
left, right ChunkRef
expCmp int
expLess bool
}{
{
desc: "From/Through/Checksum are equal",
left: ChunkRef{0, 0, 0},
right: ChunkRef{0, 0, 0},
expCmp: 0,
expLess: false,
},
{
desc: "From is before",
left: ChunkRef{0, 1, 0},
right: ChunkRef{1, 1, 0},
expCmp: 1,
expLess: true,
},
{
desc: "From is after",
left: ChunkRef{1, 1, 0},
right: ChunkRef{0, 1, 0},
expCmp: -1,
expLess: false,
},
{
desc: "Through is before",
left: ChunkRef{0, 1, 0},
right: ChunkRef{0, 2, 0},
expCmp: 1,
expLess: true,
},
{
desc: "Through is after",
left: ChunkRef{0, 2, 0},
right: ChunkRef{0, 1, 0},
expCmp: -1,
expLess: false,
},
{
desc: "Checksum is smaller",
left: ChunkRef{0, 1, 0},
right: ChunkRef{0, 1, 1},
expCmp: 1,
expLess: true,
},
{
desc: "Checksum is bigger",
left: ChunkRef{0, 0, 1},
right: ChunkRef{0, 0, 0},
expCmp: -1,
expLess: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.expCmp, tc.left.Cmp(tc.right))
require.Equal(t, tc.expLess, tc.left.Less(tc.right))
})
}
}

func TestChunkRefsCompare(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
desc string
Expand Down
Loading