Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): ignore individual bloom-gw failures #12863

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ringclient "github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -238,20 +239,8 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
}
}

if len(servers) > 0 {
// cache locality score (higher is better):
// `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances),
// but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of
// `1-2/num_instances` -> `1`, where the former represents slight
// overlap on instances to the left and right of the range.
pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64)
pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs())))
cacheLocalityScore := pctKeyspace / pctInstances
c.metrics.cacheLocalityScore.Observe(cacheLocalityScore)
}

results := make([][]*logproto.GroupedChunkRefs, len(servers))
count := 0
count := atomic.NewInt64(0)
err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]

Expand All @@ -269,10 +258,24 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
}
resp, err := client.FilterChunkRefs(ctx, req)
if err != nil {
return err
// We don't want a single bloom-gw failure to fail the entire query,
// so instrument & move on
level.Error(c.logger).Log(
"msg", "filter failed for instance, skipping",
"addr", rs.addr,
"series", len(rs.groups),
"blocks", len(rs.blocks),
"err", err,
)
// filter none of the results on failed request
c.metrics.clientRequests.WithLabelValues(typeError).Inc()
results[i] = rs.groups
} else {
c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc()
results[i] = resp.ChunkRefs
}
results[i] = resp.ChunkRefs
count += len(resp.ChunkRefs)

count.Add(int64(len(results[i])))
return nil
})
})
Expand All @@ -281,7 +284,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
return nil, err
}

buf := make([]*logproto.GroupedChunkRefs, 0, count)
buf := make([]*logproto.GroupedChunkRefs, 0, int(count.Load()))
return mergeSeries(results, buf)
}

Expand Down
20 changes: 12 additions & 8 deletions pkg/bloomgateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@ type metrics struct {
*serverMetrics
}

const (
typeSuccess = "success"
typeError = "error"
)

type clientMetrics struct {
cacheLocalityScore prometheus.Histogram
requestLatency *prometheus.HistogramVec
clients prometheus.Gauge
clientRequests *prometheus.CounterVec
requestLatency *prometheus.HistogramVec
clients prometheus.Gauge
}

func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
return &clientMetrics{
cacheLocalityScore: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
clientRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway_client",
Name: "cache_locality_score",
Help: "Cache locality score of the bloom filter, as measured by % of keyspace touched / % of bloom_gws required",
Buckets: prometheus.LinearBuckets(0.01, 0.2, 5),
}),
Name: "requests_total",
Help: "Total number of requests made to the bloom gateway",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}, []string{"type"}),
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway_client",
Expand Down
Loading