Skip to content

Commit

Permalink
chore(blooms): Various fixes in blooms read path (#12036)
Browse files Browse the repository at this point in the history
* Wire up bloom querier metrics to fix nil pointer panic
* Use `bloomutils.TokenRangesForInstance` instead of `subRing.GetTokenRangesForInstance` to avoid `zone not set` error

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored and onelapahead committed Feb 22, 2024
1 parent 1f61def commit 8d6054a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
5 changes: 3 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ import (
var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")

const (
pendingTasksInitialCap = 1024
metricsSubsystem = "bloom_gateway"
pendingTasksInitialCap = 1024
metricsSubsystem = "bloom_gateway"
querierMetricsSubsystem = "bloom_gateway_querier"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func replicationSetsWithBounds(subRing ring.ReadRing, instances []ring.InstanceD

servers := make([]rsWithRanges, 0, len(instances))
for _, inst := range instances {
tr, err := subRing.GetTokenRangesForInstance(inst.Id)
tr, err := bloomutils.TokenRangesForInstance(inst.Id, instances)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/util/constants"
)

type querierMetrics struct {
Expand Down Expand Up @@ -57,8 +58,12 @@ type BloomQuerier struct {
metrics *querierMetrics
}

func NewQuerier(c Client, logger log.Logger) *BloomQuerier {
return &BloomQuerier{c: c, logger: logger}
func NewQuerier(c Client, r prometheus.Registerer, logger log.Logger) *BloomQuerier {
return &BloomQuerier{
c: c,
metrics: newQuerierMetrics(r, constants.Loki, querierMetricsSubsystem),
logger: logger,
}
}

func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("client not called when filters are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, logger)
bq := NewQuerier(c, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -51,7 +51,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("client not called when chunkRefs are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, logger)
bq := NewQuerier(c, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -68,7 +68,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("querier propagates error from client", func(t *testing.T) {
c := &noopClient{err: errors.New("something went wrong")}
bq := NewQuerier(c, logger)
bq := NewQuerier(c, nil, logger)

ctx := context.Background()
through := model.Now()
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
if err != nil {
return nil, err
}
bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, logger)
bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, prometheus.DefaultRegisterer, logger)
}

gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, logger, prometheus.DefaultRegisterer, t.Store, indexClients, bloomQuerier)
Expand Down

0 comments on commit 8d6054a

Please sign in to comment.