Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): ignore individual bloom-gw failures #12863

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
}
}

if len(servers) > 0 {
// cache locality score (higher is better):
// `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances),
// but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of
// `1-2/num_instances` -> `1`, where the former represents slight
// overlap on instances to the left and right of the range.
pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64)
pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs())))
cacheLocalityScore := pctKeyspace / pctInstances
c.metrics.cacheLocalityScore.Observe(cacheLocalityScore)
}

results := make([][]*logproto.GroupedChunkRefs, len(servers))
count := 0
err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
Expand All @@ -269,8 +257,19 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
}
resp, err := client.FilterChunkRefs(ctx, req)
if err != nil {
return err
// We don't want a single bloom-gw failure to fail the entire query,
// so instrument & move on
c.metrics.clientRequests.WithLabelValues(typeError).Inc()
level.Error(c.logger).Log(
"msg", "filter failed for instance, skipping",
"addr", rs.addr,
"series", len(rs.groups),
"blocks", len(rs.blocks),
"err", err,
)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case a request fails, the results entries need to be equal to the request entries, otherwise we assume that all requested chunks have been filtered out.

Suggested change
return nil
results[i] = rs.groups
count += len(rs.groups)
return nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increasing count is not strictly needed, but the value is used for the allocation of the result buffer.

}
c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc()
results[i] = resp.ChunkRefs
count += len(resp.ChunkRefs)
return nil
Expand All @@ -288,6 +287,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
// mergeSeries combines responses from multiple FilterChunkRefs calls and deduplicates
// chunks from series that appear in multiple responses.
// To avoid allocations, an optional slice can be passed as second argument.
// NB(owen-d): input entries may be nil when a request fails.
func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) {
// clear provided buffer
buf = buf[:0]
Expand Down
4 changes: 3 additions & 1 deletion pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestGatewayClient_MergeSeries(t *testing.T) {
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks
},
// response 2
// response 2 is nil (failure)
nil,
// response 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

{
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
Expand Down
20 changes: 12 additions & 8 deletions pkg/bloomgateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@ type metrics struct {
*serverMetrics
}

const (
typeSuccess = "success"
typeError = "error"
)

type clientMetrics struct {
cacheLocalityScore prometheus.Histogram
requestLatency *prometheus.HistogramVec
clients prometheus.Gauge
clientRequests *prometheus.CounterVec
requestLatency *prometheus.HistogramVec
clients prometheus.Gauge
}

func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
return &clientMetrics{
cacheLocalityScore: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
clientRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway_client",
Name: "cache_locality_score",
Help: "Cache locality score of the bloom filter, as measured by % of keyspace touched / % of bloom_gws required",
Buckets: prometheus.LinearBuckets(0.01, 0.2, 5),
}),
Name: "requests_total",
Help: "Total number of requests made to the bloom gateway",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}, []string{"type"}),
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway_client",
Expand Down
Loading