From 55747182800590b7ad9b4079ccba83c960dccfc8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 2 May 2024 13:57:58 -0400 Subject: [PATCH 1/4] feat(blooms): ignore individual bloom-gw failures Signed-off-by: Owen Diehl --- pkg/bloomgateway/client.go | 26 +++++++++++++------------- pkg/bloomgateway/client_test.go | 4 +++- pkg/bloomgateway/metrics.go | 20 ++++++++++++-------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index af4b45388241..ba308457ce50 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -238,18 +238,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo } } - if len(servers) > 0 { - // cache locality score (higher is better): - // `% keyspace / % instances`. Ideally converges to 1 (querying x% of keyspace requires x% of instances), - // but can be less if the keyspace is not evenly distributed across instances. Ideal operation will see the range of - // `1-2/num_instances` -> `1`, where the former represents slight - // overlap on instances to the left and right of the range. - pctKeyspace := float64(lastFp-firstFp) / float64(math.MaxUint64) - pctInstances := float64(len(servers)) / float64(max(1, len(c.pool.Addrs()))) - cacheLocalityScore := pctKeyspace / pctInstances - c.metrics.cacheLocalityScore.Observe(cacheLocalityScore) - } - results := make([][]*logproto.GroupedChunkRefs, len(servers)) count := 0 err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { @@ -269,8 +257,19 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo } resp, err := client.FilterChunkRefs(ctx, req) if err != nil { - return err + // We don't want a single bloom-gw failure to fail the entire query, + // so instrument & move on + c.metrics.clientRequests.WithLabelValues(typeError).Inc() + level.Error(c.logger).Log( + "msg", "filter failed for instance, skipping", + "addr", rs.addr, + "series", len(rs.groups), + "blocks", len(rs.blocks), + "err", err, + ) + return nil } + c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc() results[i] = resp.ChunkRefs count += len(resp.ChunkRefs) return nil @@ -288,6 +287,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo // mergeSeries combines responses from multiple FilterChunkRefs calls and deduplicates // chunks from series that appear in multiple responses. // To avoid allocations, an optional slice can be passed as second argument. +// NB(owen-d): input entries may be nil when a request fails. func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) { // clear provided buffer buf = buf[:0] diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index d46d881078da..3a0b3aed16d6 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -52,7 +52,9 @@ func TestGatewayClient_MergeSeries(t *testing.T) { {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks }, - // response 2 + // response 2 is nil (failure) + nil, + // response 3 { {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 0d408991b40c..0885bc2ae7cb 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -15,21 +15,25 @@ type metrics struct { *serverMetrics } +const ( + typeSuccess = "success" + typeError = "error" +) + type clientMetrics struct { - cacheLocalityScore prometheus.Histogram - requestLatency *prometheus.HistogramVec - clients prometheus.Gauge + clientRequests *prometheus.CounterVec + requestLatency *prometheus.HistogramVec + clients prometheus.Gauge } func newClientMetrics(registerer prometheus.Registerer) *clientMetrics { return &clientMetrics{ - cacheLocalityScore: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + clientRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Subsystem: "bloom_gateway_client", - Name: "cache_locality_score", - Help: "Cache locality score of the bloom filter, as measured by % of keyspace touched / % of bloom_gws required", - Buckets: prometheus.LinearBuckets(0.01, 0.2, 5), - }), + Name: "requests_total", + Help: "Total number of requests made to the bloom gateway", + }, []string{"type"}), requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: constants.Loki, Subsystem: "bloom_gateway_client", From 906b5c3d0ce0d43bd7931178c48798f3ab0215b3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 2 May 2024 14:47:43 -0400 Subject: [PATCH 2/4] filter none of results on failed req Signed-off-by: Owen Diehl --- pkg/bloomgateway/client.go | 3 ++- pkg/bloomgateway/client_test.go | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index ba308457ce50..ed8a5fea0947 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -267,6 +267,8 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo "blocks", len(rs.blocks), "err", err, ) + // filter none of the results on failed request + results[i] = rs.groups return nil } c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc() @@ -287,7 +289,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo // mergeSeries combines responses from multiple FilterChunkRefs calls and deduplicates // chunks from series that appear in multiple responses. // To avoid allocations, an optional slice can be passed as second argument. -// NB(owen-d): input entries may be nil when a request fails. func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) { // clear provided buffer buf = buf[:0] diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 3a0b3aed16d6..f96f87a073ff 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -52,8 +52,6 @@ func TestGatewayClient_MergeSeries(t *testing.T) { {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks }, - // response 2 is nil (failure) - nil, // response 3 { {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks From 745528e5a2f6bcf188dcf331ec96bac64c8ad6a1 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 2 May 2024 15:09:18 -0400 Subject: [PATCH 3/4] use atomic for concurrent count access + increment expected capacity in all cases Signed-off-by: Owen Diehl --- pkg/bloomgateway/client.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index ed8a5fea0947..8a01514cdf2f 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -14,6 +14,7 @@ import ( ringclient "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -239,7 +240,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo } results := make([][]*logproto.GroupedChunkRefs, len(servers)) - count := 0 + count := atomic.NewInt64(0) err := concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error { rs := servers[i] @@ -259,7 +260,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo if err != nil { // We don't want a single bloom-gw failure to fail the entire query, // so instrument & move on - c.metrics.clientRequests.WithLabelValues(typeError).Inc() level.Error(c.logger).Log( "msg", "filter failed for instance, skipping", "addr", rs.addr, @@ -268,12 +268,14 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo "err", err, ) // filter none of the results on failed request + c.metrics.clientRequests.WithLabelValues(typeError).Inc() results[i] = rs.groups - return nil + } else { + c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc() + results[i] = resp.ChunkRefs } - c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc() - results[i] = resp.ChunkRefs - count += len(resp.ChunkRefs) + + count.Add(int64(len(results[i]))) return nil }) }) @@ -282,7 +284,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo return nil, err } - buf := make([]*logproto.GroupedChunkRefs, 0, count) + buf := make([]*logproto.GroupedChunkRefs, 0, int(count.Load())) return mergeSeries(results, buf) } From 705dceea3b7327230fd8433baa548294de5c0c15 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 2 May 2024 15:10:52 -0400 Subject: [PATCH 4/4] fix comment Signed-off-by: Owen Diehl --- pkg/bloomgateway/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index f96f87a073ff..d46d881078da 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -52,7 +52,7 @@ func TestGatewayClient_MergeSeries(t *testing.T) { {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks }, - // response 3 + // response 2 { {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks