Skip to content

Commit

Permalink
feat(rf1): Add query path for the metastore (#13636)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 24, 2024
1 parent 4abb5a4 commit 8cb19a2
Show file tree
Hide file tree
Showing 4 changed files with 825 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package metastore

import (
"context"
"slices"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
)

func (m *Metastore) ListBlocksForQuery(
ctx context.Context,
request *metastorepb.ListBlocksForQueryRequest,
) (
*metastorepb.ListBlocksForQueryResponse, error,
) {
return m.state.listBlocksForQuery(ctx, request)
}

func (m *metastoreState) listBlocksForQuery(
_ context.Context,
request *metastorepb.ListBlocksForQueryRequest,
) (
*metastorepb.ListBlocksForQueryResponse, error,
) {
if len(request.TenantId) == 0 {
return nil, status.Error(codes.InvalidArgument, "tenant_id is required")
}

if request.StartTime > request.EndTime {
return nil, status.Error(codes.InvalidArgument, "start_time must be less than or equal to end_time")
}
var resp metastorepb.ListBlocksForQueryResponse
m.segmentsMutex.Lock()
defer m.segmentsMutex.Unlock()

for _, segment := range m.segments {
for _, tenants := range segment.TenantStreams {
if tenants.TenantId == request.TenantId && inRange(segment.MinTime, segment.MaxTime, request.StartTime, request.EndTime) {
resp.Blocks = append(resp.Blocks, cloneBlockForQuery(segment))
break
}
}
}
slices.SortFunc(resp.Blocks, func(a, b *metastorepb.BlockMeta) int {
return strings.Compare(a.Id, b.Id)
})
return &resp, nil
}

func inRange(blockStart, blockEnd, queryStart, queryEnd int64) bool {
return blockStart <= queryEnd && blockEnd >= queryStart
}

func cloneBlockForQuery(b *metastorepb.BlockMeta) *metastorepb.BlockMeta {
res := &metastorepb.BlockMeta{
Id: b.Id,
MinTime: b.MinTime,
MaxTime: b.MaxTime,
CompactionLevel: b.CompactionLevel,
FormatVersion: b.FormatVersion,
IndexRef: metastorepb.DataRef{
Offset: b.IndexRef.Offset,
Length: b.IndexRef.Length,
},
TenantStreams: make([]*metastorepb.TenantStreams, 0, len(b.TenantStreams)),
}
for _, svc := range b.TenantStreams {
res.TenantStreams = append(res.TenantStreams, &metastorepb.TenantStreams{
TenantId: svc.TenantId,
MinTime: svc.MinTime,
MaxTime: svc.MaxTime,
})
}
return res
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package metastore

import (
"context"
"testing"

"github.com/stretchr/testify/require"

metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
)

func TestMetastore_ListBlocksForQuery(t *testing.T) {
block1, block2, block3 := &metastorepb.BlockMeta{
Id: "block1",
MinTime: 0,
MaxTime: 100,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenant1",
MinTime: 0,
MaxTime: 50,
},
},
}, &metastorepb.BlockMeta{
Id: "block2",
MinTime: 100,
MaxTime: 200,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenant1",
MinTime: 100,
MaxTime: 150,
},
},
}, &metastorepb.BlockMeta{
Id: "block3",
MinTime: 200,
MaxTime: 300,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenant2",
MinTime: 200,
MaxTime: 250,
},
{
TenantId: "tenant1",
MinTime: 200,
MaxTime: 250,
},
},
}
m := &Metastore{
state: &metastoreState{
segments: map[string]*metastorepb.BlockMeta{
"block1": block1,
"block2": block2,
"block3": block3,
},
},
}

tests := []struct {
name string
request *metastorepb.ListBlocksForQueryRequest
expectedResponse *metastorepb.ListBlocksForQueryResponse
}{
{
name: "Matching tenant and time range",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 0,
EndTime: 100,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{
Blocks: []*metastorepb.BlockMeta{
block1,
block2,
},
},
},
{
name: "Matching tenant but partial time range",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 50,
EndTime: 150,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{
Blocks: []*metastorepb.BlockMeta{
block1,
block2,
},
},
},
{
name: "Non-matching tenant",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant3",
StartTime: 0,
EndTime: 100,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{},
},
{
name: "Matching one tenant but not the other",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 100,
EndTime: 550,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{
Blocks: []*metastorepb.BlockMeta{
block1,
block2,
block3,
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resp, err := m.ListBlocksForQuery(context.Background(), test.request)
require.NoError(t, err)
require.Equal(t, test.expectedResponse, resp)
})
}
}
Loading

0 comments on commit 8cb19a2

Please sign in to comment.