diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 79627f5abdda9..cf842811fc197 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -42,7 +42,7 @@ type Config struct { MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."` MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."` TeeParallelism int `yaml:"tee_parallelism,omitempty" doc:"description=The number of parallel goroutines to use for forwarding requests to the pattern ingester."` - TeeBufferSize int `yaml:"tee_buffer_size,omitempty" doc:"Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped."` + TeeQueueSize int `yaml:"tee_queue_size,omitempty" doc:"Maxiumum number of pending teed request to pattern ingesters. If the queue is full the request is dropped."` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -90,10 +90,10 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { "The number of parallel goroutines to use for forwarding requests to the pattern ingester.", ) fs.IntVar( - &cfg.TeeBufferSize, - "pattern-ingester.tee-buffer-size", + &cfg.TeeQueueSize, + "pattern-ingester.tee-queue-size", 100, - "Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped.", + "Maxiumum number of pending teed request to pattern ingesters. If the queue is full the request is dropped.", ) } diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go index b240dc889d27b..4e731198157f7 100644 --- a/pkg/pattern/tee.go +++ b/pkg/pattern/tee.go @@ -24,6 +24,7 @@ type Tee struct { ingesterMetricAppends *prometheus.CounterVec teedRequests *prometheus.CounterVec + teeQueueSize *prometheus.GaugeVec requestCh chan request @@ -54,14 +55,18 @@ func NewTee( ingesterMetricAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "pattern_ingester_metric_appends_total", Help: "The total number of metric only batch appends sent to pattern ingesters. These requests will not be processed for patterns.", - }, []string{"ingester", "status"}), + }, []string{"status"}), teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "pattern_ingester_teed_requests_total", Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.", }, []string{"tenant", "status"}), + teeQueueSize: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "pattern_ingester_tee_queue_size", + Help: "Current number of requests in the pattern ingester tee queue.", + }, []string{"tenant", "status"}), cfg: cfg, ringClient: ringClient, - requestCh: make(chan request, cfg.TeeBufferSize), + requestCh: make(chan request, cfg.TeeQueueSize), quit: make(chan struct{}), } @@ -78,6 +83,7 @@ func (t *Tee) run() { case <-t.quit: return case req := <-t.requestCh: + t.teeQueueSize.WithLabelValues(req.tenant).Dec() ctx, cancel := context.WithTimeout( user.InjectOrgID(context.Background(), req.tenant), t.cfg.ClientConfig.RemoteTimeout, @@ -94,6 +100,7 @@ func (t *Tee) run() { func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) error { err := t.sendOwnedStream(ctx, stream) if err == nil { + t.ingesterMetricAppends.WithLabelValues("success").Inc() // Success, return early return nil } @@ -104,7 +111,7 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er // try to forward request to any pattern ingester so we at least capture the metrics. replicationSet, err := t.ringClient.Ring().GetReplicationSetForOperation(ring.WriteNoExtend) if replicationSet.Instances == nil { - t.ingesterMetricAppends.WithLabelValues("none", "fail").Inc() + t.ingesterMetricAppends.WithLabelValues("fail").Inc() return errors.New("no instances found for fallback") } @@ -120,15 +127,16 @@ func (t *Tee) sendStream(ctx context.Context, stream distributor.KeyedStream) er _, err = client.(logproto.PatternClient).Push(ctx, req) if err != nil { - t.ingesterMetricAppends.WithLabelValues(addr, "fail").Inc() continue } - t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc() + + t.ingesterMetricAppends.WithLabelValues("success").Inc() // bail after any success to prevent sending more than one return nil } } + t.ingesterMetricAppends.WithLabelValues("fail").Inc() return err } @@ -139,6 +147,7 @@ func (t *Tee) sendOwnedStream(ctx context.Context, stream distributor.KeyedStrea return err } if replicationSet.Instances == nil { + t.ingesterAppends.WithLabelValues("none", "fail").Inc() return errors.New("no instances found") } addr := replicationSet.Instances[0].Addr @@ -159,7 +168,6 @@ func (t *Tee) sendOwnedStream(ctx context.Context, stream distributor.KeyedStrea } // Success here means the stream will be processed for both metrics and patterns t.ingesterAppends.WithLabelValues(addr, "success").Inc() - t.ingesterMetricAppends.WithLabelValues(addr, "success").Inc() return nil } @@ -177,6 +185,7 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { select { case t.requestCh <- req: t.teedRequests.WithLabelValues(tenant, "queued").Inc() + t.teeQueueSize.WithLabelValues(req.tenant).Inc() return default: t.teedRequests.WithLabelValues(tenant, "dropped").Inc() @@ -189,5 +198,5 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { // Stop will cancel any ongoing requests and stop the goroutine listening for requests func (t *Tee) Stop() { close(t.quit) - t.requestCh = make(chan request, t.cfg.TeeBufferSize) + t.requestCh = make(chan request, t.cfg.TeeQueueSize) }