From 62c5c5c64182736f65ec9c903e0789986b264425 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Fri, 26 Jul 2024 15:22:38 +0100 Subject: [PATCH] fix: add missing flush op timeout (#13679) --- docs/sources/shared/configuration.md | 2 +- pkg/ingester-rf1/flush.go | 4 +++- pkg/ingester-rf1/ingester.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 0e1687d06cc78..a7774c34c3ce6 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -358,7 +358,7 @@ ingester_rf1: # The timeout for an individual flush. Will be retried up to # `flush-op-backoff-retries` times. # CLI flag: -ingester-rf1.flush-op-timeout - [flush_op_timeout: | default = 10m] + [flush_op_timeout: | default = 10s] # Forget about ingesters having heartbeat timestamps older than # `ring.kvstore.heartbeat_timeout`. This is equivalent to clicking on the diff --git a/pkg/ingester-rf1/flush.go b/pkg/ingester-rf1/flush.go index aa22166d4fd3e..55601337d350f 100644 --- a/pkg/ingester-rf1/flush.go +++ b/pkg/ingester-rf1/flush.go @@ -96,8 +96,10 @@ func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error { } func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error { - start := time.Now() + ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) + defer cancelFunc() + start := time.Now() i.metrics.flushesTotal.Add(1) defer func() { i.metrics.flushDuration.Observe(time.Since(start).Seconds()) }() diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index 0b5a6c5fd724a..8ee0d0e8928b3 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -110,7 +110,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester-rf1.flush-op-backoff-min-period", 100*time.Millisecond, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester-rf1.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester-rf1.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.") - f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Second, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") f.DurationVar(&cfg.MaxSegmentAge, "ingester-rf1.max-segment-age", 500*time.Millisecond, "The maximum age of a segment before it should be flushed. Increasing this value allows more time for a segment to grow to max-segment-size, but may increase latency if the write volume is too small.") f.IntVar(&cfg.MaxSegmentSize, "ingester-rf1.max-segment-size", 8*1024*1024, "The maximum size of a segment before it should be flushed. It is not a strict limit, and segments can exceed the maximum size when individual appends are larger than the remaining capacity.") f.IntVar(&cfg.MaxSegments, "ingester-rf1.max-segments", 10, "The maximum number of segments to buffer in-memory. Increasing this value allows for large bursts of writes to be buffered in memory, but may increase latency if the write volume exceeds the rate at which segments can be flushed.")