From fbfc7fe192f90027078e27765397c643c9a1a469 Mon Sep 17 00:00:00 2001 From: JordanRushing Date: Tue, 25 Jun 2024 13:40:14 -0500 Subject: [PATCH 1/4] Support multi-zone ingesters when converting global to local limits for streams in limiter.go Signed-off-by: JordanRushing --- pkg/ingester/ingester_test.go | 4 ++++ pkg/ingester/limiter.go | 39 ++++++++++++++++++++++------------- pkg/ingester/limiter_test.go | 8 +++++++ 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 444e1317e697..570452af44eb 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -1568,6 +1568,10 @@ func (r *readRingMock) ZonesCount() int { return 1 } +func (r *readRingMock) HealthyInstancesInZoneCount() int { + return len(r.replicationSet.Instances) +} + func (r *readRingMock) Subring(_ uint32, _ int) ring.ReadRing { return r } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index daa1fe7aec8d..a5830314220d 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -20,6 +20,8 @@ const ( // to count members type RingCount interface { HealthyInstancesCount() int + HealthyInstancesInZoneCount() int + ZonesCount() int } type Limits interface { @@ -84,7 +86,7 @@ func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLi // We can assume that streams are evenly distributed across ingesters // so we do convert the global limit into a local limit globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID) - adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit) + adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit, localLimit) // Set the calculated limit to the lesser of the local limit or the new calculated global limit calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit) @@ -105,24 +107,33 @@ func (l *Limiter) minNonZero(first, second int) int { return first } -func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { +func (l *Limiter) convertGlobalToLocalLimit(globalLimit, maxStreamsPerUser int) int { if globalLimit == 0 { return 0 } - // todo: change to healthyInstancesInZoneCount() once - // Given we don't need a super accurate count (ie. when the ingesters - // topology changes) and we prefer to always be in favor of the tenant, - // we can use a per-ingester limit equal to: - // (global limit / number of ingesters) * replication factor - numIngesters := l.ring.HealthyInstancesCount() - - // May happen because the number of ingesters is asynchronously updated. - // If happens, we just temporarily ignore the global limit. - if numIngesters > 0 { - return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) + + zonesCount := l.ring.ZonesCount() + + if zonesCount <= 1 { + numIngesters := l.ring.HealthyInstancesCount() + if numIngesters > 0 { + return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) + } + return 0 } - return 0 + ingestersInZone := l.ring.HealthyInstancesInZoneCount() + if ingestersInZone == 0 { + return 0 // Avoid division by zero + } + + newLimit := int(float64(globalLimit) * float64(l.replicationFactor) * float64(zonesCount) / float64(ingestersInZone)) + + if maxStreamsPerUser > 0 && (newLimit == 0 || maxStreamsPerUser < newLimit) { + return maxStreamsPerUser + } + + return newLimit } type supplier[T any] func() T diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index b00bede10417..6201ebf8e273 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -214,6 +214,14 @@ func (m *ringCountMock) HealthyInstancesCount() int { return m.count } +func (m *ringCountMock) ZonesCount() int { + return 1 +} + +func (m *ringCountMock) HealthyInstancesInZoneCount() int { + return m.count +} + // Assert some of the weirder (bug?) behavior of golang.org/x/time/rate func TestGoLimiter(t *testing.T) { for _, tc := range []struct { From e7b82926c4fe2ee71f2e7f42168547de0b1d3e93 Mon Sep 17 00:00:00 2001 From: JordanRushing Date: Wed, 26 Jun 2024 13:09:02 -0500 Subject: [PATCH 2/4] Simplify convertGlobalToLocalLimit; add test Signed-off-by: JordanRushing --- pkg/ingester/limiter.go | 36 +++++++++++------------ pkg/ingester/limiter_test.go | 56 ++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index a5830314220d..48df5d7d2edf 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -86,7 +86,7 @@ func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLi // We can assume that streams are evenly distributed across ingesters // so we do convert the global limit into a local limit globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID) - adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit, localLimit) + adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit) // Set the calculated limit to the lesser of the local limit or the new calculated global limit calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit) @@ -107,33 +107,33 @@ func (l *Limiter) minNonZero(first, second int) int { return first } -func (l *Limiter) convertGlobalToLocalLimit(globalLimit, maxStreamsPerUser int) int { - if globalLimit == 0 { +func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { + if globalLimit == 0 || l.replicationFactor == 0 { return 0 } zonesCount := l.ring.ZonesCount() - if zonesCount <= 1 { - numIngesters := l.ring.HealthyInstancesCount() - if numIngesters > 0 { - return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) - } - return 0 - } - - ingestersInZone := l.ring.HealthyInstancesInZoneCount() - if ingestersInZone == 0 { - return 0 // Avoid division by zero + return calculateLimitForSingleZone(globalLimit, l) } - newLimit := int(float64(globalLimit) * float64(l.replicationFactor) * float64(zonesCount) / float64(ingestersInZone)) + return calculateLimitForMultipleZones(globalLimit, zonesCount, l) +} - if maxStreamsPerUser > 0 && (newLimit == 0 || maxStreamsPerUser < newLimit) { - return maxStreamsPerUser +func calculateLimitForSingleZone(globalLimit int, l *Limiter) int { + numIngesters := l.ring.HealthyInstancesCount() + if numIngesters > 0 { + return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) } + return 0 +} - return newLimit +func calculateLimitForMultipleZones(globalLimit, zonesCount int, l *Limiter) int { + ingestersInZone := l.ring.HealthyInstancesInZoneCount() + if ingestersInZone > 0 { + return int(float64(globalLimit) * float64(l.replicationFactor) * float64(zonesCount) / float64(ingestersInZone)) + } + return 0 } type supplier[T any] func() T diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 6201ebf8e273..d637159128eb 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -262,3 +262,59 @@ func TestGoLimiter(t *testing.T) { }) } } + +type MockRing struct { + zonesCount int + healthyInstancesCount int + healthyInstancesInZoneCount int +} + +func (m *MockRing) ZonesCount() int { + return m.zonesCount +} + +func (m *MockRing) HealthyInstancesCount() int { + return m.healthyInstancesCount +} + +func (m *MockRing) HealthyInstancesInZoneCount() int { + return m.healthyInstancesInZoneCount +} + +func TestConvertGlobalToLocalLimit(t *testing.T) { + tests := []struct { + name string + globalLimit int + zonesCount int + healthyInstancesCount int + healthyInstancesInZoneCount int + replicationFactor int + expectedLocalLimit int + }{ + {"GlobalLimitZero", 0, 1, 1, 1, 3, 0}, + {"SingleZoneMultipleIngesters", 100, 1, 10, 10, 3, 10}, + {"MultipleZones", 200, 2, 0, 10, 3, 40}, + {"MultipleZonesNoIngesters", 200, 2, 0, 0, 3, 0}, + {"MultipleZonesNoIngestersInZone", 200, 3, 10, 0, 3, 0}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockRing := &MockRing{ + zonesCount: tc.zonesCount, + healthyInstancesCount: tc.healthyInstancesCount, + healthyInstancesInZoneCount: tc.healthyInstancesInZoneCount, + } + + limiter := &Limiter{ + ring: mockRing, + replicationFactor: tc.replicationFactor, + } + + localLimit := limiter.convertGlobalToLocalLimit(tc.globalLimit) + if localLimit != tc.expectedLocalLimit { + t.Errorf("expected %d, got %d", tc.expectedLocalLimit, localLimit) + } + }) + } +} From 8a9c36f23b627e4bcf171a41c9d48858af0801c0 Mon Sep 17 00:00:00 2001 From: JordanRushing Date: Wed, 26 Jun 2024 14:07:59 -0500 Subject: [PATCH 3/4] Update dskit version to support `healthyInstancesInZone`; fix stream limit test; go mod vendor & go mod tidy Signed-off-by: JordanRushing --- go.mod | 2 +- go.sum | 4 +- pkg/ingester/limiter_test.go | 4 +- .../grafana/dskit/grpcutil/dns_resolver.go | 2 +- .../grafana/dskit/grpcutil/health_check.go | 4 +- .../grafana/dskit/httpgrpc/httpgrpc.go | 10 ++++- .../grafana/dskit/httpgrpc/server/server.go | 37 ++++++++++++++++++- .../grafana/dskit/kv/consul/client.go | 12 +++--- .../github.com/grafana/dskit/kv/etcd/mock.go | 12 +++--- .../dskit/kv/memberlist/memberlist_client.go | 2 +- vendor/github.com/grafana/dskit/kv/multi.go | 2 +- .../grafana/dskit/middleware/logging.go | 4 +- .../grafana/dskit/ring/lifecycler.go | 23 +++++++++--- .../grafana/dskit/services/failure_watcher.go | 2 +- .../grafana/dskit/spanprofiler/tracer.go | 3 ++ vendor/modules.txt | 2 +- 16 files changed, 93 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 7d561370ad7d..4a49b95b102b 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 - github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 + github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd diff --git a/go.sum b/go.sum index 39cc45a12030..170ca1df4672 100644 --- a/go.sum +++ b/go.sum @@ -1017,8 +1017,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= -github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 h1:k8vINlI4w+RYc37NRwQlRe/IHYoEbu6KAe2XdGDeV1U= -github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc= +github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d h1:CD8PWWX+9lYdgeMquSofmLErvCtk7jb+3/W/SH6oo/k= +github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index d637159128eb..11e693af0532 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -292,8 +292,8 @@ func TestConvertGlobalToLocalLimit(t *testing.T) { expectedLocalLimit int }{ {"GlobalLimitZero", 0, 1, 1, 1, 3, 0}, - {"SingleZoneMultipleIngesters", 100, 1, 10, 10, 3, 10}, - {"MultipleZones", 200, 2, 0, 10, 3, 40}, + {"SingleZoneMultipleIngesters", 100, 1, 10, 10, 3, 30}, + {"MultipleZones", 200, 3, 0, 10, 3, 180}, {"MultipleZonesNoIngesters", 200, 2, 0, 0, 3, 0}, {"MultipleZonesNoIngestersInZone", 200, 3, 10, 0, 3, 0}, } diff --git a/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go b/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go index ef9c6398944e..507028aa602c 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go +++ b/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go @@ -208,7 +208,7 @@ func (w *dnsWatcher) lookupSRV() map[string]*Update { for _, a := range addrs { a, ok := formatIP(a) if !ok { - level.Error(w.logger).Log("failed IP parsing", "err", err) + level.Error(w.logger).Log("msg", "failed IP parsing", "err", err) continue } addr := a + ":" + strconv.Itoa(int(s.Port)) diff --git a/vendor/github.com/grafana/dskit/grpcutil/health_check.go b/vendor/github.com/grafana/dskit/grpcutil/health_check.go index 44b5e15e7657..e66ccd6fae49 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/health_check.go +++ b/vendor/github.com/grafana/dskit/grpcutil/health_check.go @@ -16,7 +16,7 @@ type Check func(ctx context.Context) bool // WithManager returns a new Check that tests if the managed services are healthy. func WithManager(manager *services.Manager) Check { - return func(ctx context.Context) bool { + return func(context.Context) bool { states := manager.ServicesByState() // Given this is a health check endpoint for the whole instance, we should consider @@ -33,7 +33,7 @@ func WithManager(manager *services.Manager) Check { // WithShutdownRequested returns a new Check that returns false when shutting down. func WithShutdownRequested(requested *atomic.Bool) Check { - return func(ctx context.Context) bool { + return func(context.Context) bool { return !requested.Load() } } diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index b755e2adceae..02e6e493736b 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -116,8 +116,14 @@ func Errorf(code int, tmpl string, args ...interface{}) error { }) } -// ErrorFromHTTPResponse converts an HTTP response into a grpc error +// ErrorFromHTTPResponse converts an HTTP response into a grpc error, and uses HTTP response body as an error message. +// Note that if HTTP response body contains non-utf8 string, then returned error cannot be marshalled by protobuf. func ErrorFromHTTPResponse(resp *HTTPResponse) error { + return ErrorFromHTTPResponseWithMessage(resp, string(resp.Body)) +} + +// ErrorFromHTTPResponseWithMessage converts an HTTP response into a grpc error, and uses supplied message for Error message. +func ErrorFromHTTPResponseWithMessage(resp *HTTPResponse, msg string) error { a, err := types.MarshalAny(resp) if err != nil { return err @@ -125,7 +131,7 @@ func ErrorFromHTTPResponse(resp *HTTPResponse) error { return status.ErrorProto(&spb.Status{ Code: resp.Code, - Message: string(resp.Body), + Message: msg, Details: []*types.Any{a}, }) } diff --git a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go index b73c5a0f7750..6a831dac0f8f 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go @@ -26,12 +26,22 @@ import ( ) var ( - // DoNotLogErrorHeaderKey is a header key used for marking non-loggable errors. More precisely, if an HTTP response + // DoNotLogErrorHeaderKey is a header name used for marking non-loggable errors. More precisely, if an HTTP response // has a status code 5xx, and contains a header with key DoNotLogErrorHeaderKey and any values, the generated error // will be marked as non-loggable. DoNotLogErrorHeaderKey = http.CanonicalHeaderKey("X-DoNotLogError") + + // ErrorMessageHeaderKey is a header name for header that contains error message that should be used when Server.Handle + // (httpgrpc.HTTP/Handle implementation) decides to return the response as an error, using status.ErrorProto. + // Normally Server.Handle would use entire response body as a error message, but Message field of rcp.Status object + // is a string, and if body contains non-utf8 bytes, marshalling of this object will fail. + ErrorMessageHeaderKey = http.CanonicalHeaderKey("X-ErrorMessage") ) +type contextType int + +const handledByHttpgrpcServer contextType = 0 + type Option func(*Server) func WithReturn4XXErrors(s *Server) { @@ -59,6 +69,8 @@ func NewServer(handler http.Handler, opts ...Option) *Server { // Handle implements HTTPServer. func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + ctx = context.WithValue(ctx, handledByHttpgrpcServer, true) + req, err := httpgrpc.ToHTTPRequest(ctx, r) if err != nil { return nil, err @@ -74,13 +86,24 @@ func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc. header.Del(DoNotLogErrorHeaderKey) // remove before converting to httpgrpc resp } + errorMessageFromHeader := "" + if msg, ok := header[ErrorMessageHeaderKey]; ok { + errorMessageFromHeader = msg[0] + header.Del(ErrorMessageHeaderKey) // remove before converting to httpgrpc resp + } + resp := &httpgrpc.HTTPResponse{ Code: int32(recorder.Code), Headers: httpgrpc.FromHeader(header), Body: recorder.Body.Bytes(), } if s.shouldReturnError(resp) { - err := httpgrpc.ErrorFromHTTPResponse(resp) + var err error + if errorMessageFromHeader != "" { + err = httpgrpc.ErrorFromHTTPResponseWithMessage(resp, errorMessageFromHeader) + } else { + err = httpgrpc.ErrorFromHTTPResponse(resp) + } if doNotLogError { err = middleware.DoNotLogError{Err: err} } @@ -206,3 +229,13 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } + +// IsHandledByHttpgrpcServer returns true if context is associated with HTTP request that was initiated by +// Server.Handle, which is an implementation of httpgrpc.HTTP/Handle gRPC method. +func IsHandledByHttpgrpcServer(ctx context.Context) bool { + val := ctx.Value(handledByHttpgrpcServer) + if v, ok := val.(bool); ok { + return v + } + return false +} diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/vendor/github.com/grafana/dskit/kv/consul/client.go index 5501a67d894b..a750ec826337 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/vendor/github.com/grafana/dskit/kv/consul/client.go @@ -116,7 +116,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error { return err } - return instrument.CollectedRequest(ctx, "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(context.Context) error { _, err := c.kv.Put(&consul.KVPair{ Key: key, Value: bytes, @@ -376,16 +376,18 @@ func checkLastIndex(index, metaLastIndex uint64) (newIndex uint64, skip bool) { // Don't just keep using index=0. // After blocking request, returned index must be at least 1. return 1, false - } else if metaLastIndex < index { + } + if metaLastIndex < index { // Index reset. return 0, false - } else if index == metaLastIndex { + } + if index == metaLastIndex { // Skip if the index is the same as last time, because the key value is // guaranteed to be the same as last time return metaLastIndex, true - } else { - return metaLastIndex, false } + + return metaLastIndex, false } func (c *Client) createRateLimiter() *rate.Limiter { diff --git a/vendor/github.com/grafana/dskit/kv/etcd/mock.go b/vendor/github.com/grafana/dskit/kv/etcd/mock.go index 6349cee1c281..c8eeb3183aa0 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/mock.go +++ b/vendor/github.com/grafana/dskit/kv/etcd/mock.go @@ -234,15 +234,17 @@ func (m *mockKV) Do(_ context.Context, op clientv3.Op) (clientv3.OpResponse, err func (m *mockKV) doInternal(op clientv3.Op) (clientv3.OpResponse, error) { if op.IsGet() { return m.doGet(op) - } else if op.IsPut() { + } + if op.IsPut() { return m.doPut(op) - } else if op.IsDelete() { + } + if op.IsDelete() { return m.doDelete(op) - } else if op.IsTxn() { + } + if op.IsTxn() { return m.doTxn(op) - } else { - panic(fmt.Sprintf("unsupported operation: %+v", op)) } + panic(fmt.Sprintf("unsupported operation: %+v", op)) } func (m *mockKV) doGet(op clientv3.Op) (clientv3.OpResponse, error) { diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index e8a94debe181..a1b659d4097e 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -1171,7 +1171,7 @@ func (m *KV) queueBroadcast(key string, content []string, version uint, message content: content, version: version, msg: message, - finished: func(b ringBroadcast) { + finished: func(ringBroadcast) { m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) }, logger: m.logger, diff --git a/vendor/github.com/grafana/dskit/kv/multi.go b/vendor/github.com/grafana/dskit/kv/multi.go index 3ac69c9fe475..e1e461ea1f28 100644 --- a/vendor/github.com/grafana/dskit/kv/multi.go +++ b/vendor/github.com/grafana/dskit/kv/multi.go @@ -350,7 +350,7 @@ func (m *MultiClient) writeToSecondary(ctx context.Context, primary kvclient, ke } m.mirrorWritesCounter.Inc() - err := kvc.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { + err := kvc.client.CAS(ctx, key, func(interface{}) (out interface{}, retry bool, err error) { // try once return newValue, false, nil }) diff --git a/vendor/github.com/grafana/dskit/middleware/logging.go b/vendor/github.com/grafana/dskit/middleware/logging.go index fe00d3a82846..c2306292b3f4 100644 --- a/vendor/github.com/grafana/dskit/middleware/logging.go +++ b/vendor/github.com/grafana/dskit/middleware/logging.go @@ -56,9 +56,11 @@ func NewLogMiddleware(log log.Logger, logRequestHeaders bool, logRequestAtInfoLe // logWithRequest information from the request and context as fields. func (l Log) logWithRequest(r *http.Request) log.Logger { localLog := l.Log - traceID, ok := tracing.ExtractTraceID(r.Context()) + traceID, ok := tracing.ExtractSampledTraceID(r.Context()) if ok { localLog = log.With(localLog, "trace_id", traceID) + } else if traceID != "" { + localLog = log.With(localLog, "trace_id_unsampled", traceID) } if l.SourceIPs != nil { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 4f51b46a503c..7c54eabdd873 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -158,11 +158,12 @@ type Lifecycler struct { readySince time.Time // Keeps stats updated at every heartbeat period - countersLock sync.RWMutex - healthyInstancesCount int - instancesCount int - instancesInZoneCount int - zonesCount int + countersLock sync.RWMutex + healthyInstancesCount int + instancesCount int + healthyInstancesInZoneCount int + instancesInZoneCount int + zonesCount int tokenGenerator TokenGenerator // The maximum time allowed to wait on the CanJoin() condition. @@ -441,6 +442,15 @@ func (i *Lifecycler) InstancesCount() int { return i.instancesCount } +// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in +// this lifecycler's zone, updated during the last heartbeat period. +func (i *Lifecycler) HealthyInstancesInZoneCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.healthyInstancesInZoneCount +} + // InstancesInZoneCount returns the number of instances in the ring that are registered in // this lifecycler's zone, updated during the last heartbeat period. func (i *Lifecycler) InstancesInZoneCount() int { @@ -913,6 +923,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 instancesCount := 0 zones := map[string]int{} + healthyInstancesInZone := map[string]int{} if ringDesc != nil { now := time.Now() @@ -924,6 +935,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { healthyInstancesCount++ + healthyInstancesInZone[ingester.Zone]++ } } } @@ -932,6 +944,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount i.instancesCount = instancesCount + i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone] i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) i.countersLock.Unlock() diff --git a/vendor/github.com/grafana/dskit/services/failure_watcher.go b/vendor/github.com/grafana/dskit/services/failure_watcher.go index 9cb7e3a8fa7f..657656f50d47 100644 --- a/vendor/github.com/grafana/dskit/services/failure_watcher.go +++ b/vendor/github.com/grafana/dskit/services/failure_watcher.go @@ -35,7 +35,7 @@ func (w *FailureWatcher) WatchService(service Service) { panic(errFailureWatcherNotInitialized) } - service.AddListener(NewListener(nil, nil, nil, nil, func(from State, failure error) { + service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) { w.ch <- errors.Wrapf(failure, "service %s failed", DescribeService(service)) })) } diff --git a/vendor/github.com/grafana/dskit/spanprofiler/tracer.go b/vendor/github.com/grafana/dskit/spanprofiler/tracer.go index c28b52b11d44..e4ed2974a4a2 100644 --- a/vendor/github.com/grafana/dskit/spanprofiler/tracer.go +++ b/vendor/github.com/grafana/dskit/spanprofiler/tracer.go @@ -41,6 +41,9 @@ func (t *tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOp if !ok { return span } + if !spanCtx.IsSampled() { + return span + } // pprof labels are attached only once, at the span root level. if !isRootSpan(opts...) { return span diff --git a/vendor/modules.txt b/vendor/modules.txt index 083b3735cf75..4f1fcd57b8d2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -927,7 +927,7 @@ github.com/gorilla/websocket # github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 ## explicit; go 1.17 github.com/grafana/cloudflare-go -# github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 +# github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d ## explicit; go 1.20 github.com/grafana/dskit/aws github.com/grafana/dskit/backoff From 0c7313b9dcbdfd263360c731effaa9b2785d1cc5 Mon Sep 17 00:00:00 2001 From: JordanRushing Date: Thu, 27 Jun 2024 12:38:28 -0500 Subject: [PATCH 4/4] Update multi-zone stream limit calculation; fix test Signed-off-by: JordanRushing --- pkg/ingester/limiter.go | 2 +- pkg/ingester/limiter_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 48df5d7d2edf..1ed3a3ea2716 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -131,7 +131,7 @@ func calculateLimitForSingleZone(globalLimit int, l *Limiter) int { func calculateLimitForMultipleZones(globalLimit, zonesCount int, l *Limiter) int { ingestersInZone := l.ring.HealthyInstancesInZoneCount() if ingestersInZone > 0 { - return int(float64(globalLimit) * float64(l.replicationFactor) * float64(zonesCount) / float64(ingestersInZone)) + return int((float64(globalLimit) * float64(l.replicationFactor)) / float64(zonesCount) / float64(ingestersInZone)) } return 0 } diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 11e693af0532..0d0055d0a0af 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -293,9 +293,9 @@ func TestConvertGlobalToLocalLimit(t *testing.T) { }{ {"GlobalLimitZero", 0, 1, 1, 1, 3, 0}, {"SingleZoneMultipleIngesters", 100, 1, 10, 10, 3, 30}, - {"MultipleZones", 200, 3, 0, 10, 3, 180}, - {"MultipleZonesNoIngesters", 200, 2, 0, 0, 3, 0}, - {"MultipleZonesNoIngestersInZone", 200, 3, 10, 0, 3, 0}, + {"MultipleZones", 200, 3, 30, 10, 3, 20}, + {"MultipleZonesNoHealthyIngesters", 200, 2, 0, 0, 3, 0}, + {"MultipleZonesNoHealthyIngestersInZone", 200, 3, 10, 0, 3, 0}, } for _, tc := range tests {