Skip to content

Commit

Permalink
feat: Pattern ingesters add a limiter for high eviction rate (#13464)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 9, 2024
1 parent 845359d commit e08b4a7
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 49 deletions.
12 changes: 11 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,17 @@ pattern_ingester:
# first flush check is delayed by a random time up to 0.8x the flush check
# period. Additionally, there is +/- 1% jitter added to the interval.
# CLI flag: -pattern-ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]
[flush_check_period: <duration> | default = 1m]

# The maximum number of detected pattern clusters that can be created by
# streams.
# CLI flag: -pattern-ingester.max-clusters
[max_clusters: <int> | default = 300]

# The maximum eviction ratio of patterns per stream. Once that ratio is
# reached, the stream will throttled pattern detection.
# CLI flag: -pattern-ingester.max-eviction-ratio
[max_eviction_ratio: <float> | default = 0.25]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/chunk/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
TimeResolution = model.Time(int64(time.Second*10) / 1e6)
MaxChunkTime = 1 * time.Hour
MaxChunkTime = 15 * time.Minute
)

func TruncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step }
86 changes: 45 additions & 41 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (
)

type Config struct {
maxNodeDepth int
LogClusterDepth int
SimTh float64
MaxChildren int
ExtraDelimiters []string
MaxClusters int
ParamString string
maxNodeDepth int
LogClusterDepth int
SimTh float64
MaxChildren int
ExtraDelimiters []string
MaxClusters int
ParamString string
MaxEvictionRatio float64
}

func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
Expand All @@ -60,29 +61,13 @@ type LogClusterCache struct {
}

func (c *LogClusterCache) Values() []*LogCluster {
values := make([]*LogCluster, 0)
for _, key := range c.cache.Keys() {
if value, ok := c.cache.Peek(key); ok {
values = append(values, value)
}
}
return values
return c.cache.Values()
}

func (c *LogClusterCache) Set(key int, cluster *LogCluster) {
c.cache.Add(key, cluster)
}

func (c *LogClusterCache) Iterate(fn func(*LogCluster) bool) {
for _, key := range c.cache.Keys() {
if value, ok := c.cache.Peek(key); ok {
if !fn(value) {
return
}
}
}
}

func (c *LogClusterCache) Get(key int) *LogCluster {
cluster, ok := c.cache.Get(key)
if !ok {
Expand Down Expand Up @@ -140,10 +125,11 @@ func DefaultConfig() *Config {
// Both SimTh and MaxClusterDepth impact branching factor: the greater
// MaxClusterDepth and SimTh, the less the chance that there will be
// "similar" clusters, but the greater the footprint.
SimTh: 0.3,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
SimTh: 0.3,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
MaxEvictionRatio: 0.25,
}
}

Expand All @@ -152,10 +138,17 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
panic("depth argument must be at least 3")
}
config.maxNodeDepth = config.LogClusterDepth - 2
var evictFn func(int, *LogCluster)
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }

d := &Drain{
config: config,
rootNode: createNode(),
metrics: metrics,
maxAllowedLineLength: 3000,
format: format,
}

limiter := newLimiter(config.MaxEvictionRatio)

var tokenizer LineTokenizer
switch format {
case FormatJSON:
Expand All @@ -165,16 +158,20 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
default:
tokenizer = newPunctuationTokenizer()
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: tokenizer,
maxAllowedLineLength: 3000,
format: format,
}
d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) {
if metrics != nil {
if d.pruning {
metrics.PatternsPrunedTotal.Inc()
} else {
metrics.PatternsEvictedTotal.Inc()
}
}
if !d.pruning {
limiter.Evict()
}
})
d.tokenizer = tokenizer
d.limiter = limiter
return d
}

Expand All @@ -189,6 +186,8 @@ type Drain struct {
format string
tokens []string
state interface{}
limiter *limiter
pruning bool
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -200,6 +199,9 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
if len(content) > d.maxAllowedLineLength {
return nil
}
Expand Down Expand Up @@ -325,7 +327,9 @@ func (d *Drain) pruneTree(node *Node) int {
}

func (d *Drain) Delete(cluster *LogCluster) {
d.pruning = true
d.idToCluster.cache.Remove(cluster.id)
d.pruning = false
}

func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster {
Expand Down
51 changes: 51 additions & 0 deletions pkg/pattern/drain/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package drain

import (
"time"
)

type limiter struct {
added int64
evicted int64
maxPercentage float64
blockedUntil time.Time
}

func newLimiter(maxPercentage float64) *limiter {
return &limiter{
maxPercentage: maxPercentage,
}
}

func (l *limiter) Allow() bool {
if !l.blockedUntil.IsZero() {
if time.Now().Before(l.blockedUntil) {
return false
}
l.reset()
}
if l.added == 0 {
l.added++
return true
}
if float64(l.evicted)/float64(l.added) > l.maxPercentage {
l.block()
return false
}
l.added++
return true
}

func (l *limiter) Evict() {
l.evicted++
}

func (l *limiter) reset() {
l.added = 0
l.evicted = 0
l.blockedUntil = time.Time{}
}

func (l *limiter) block() {
l.blockedUntil = time.Now().Add(10 * time.Minute)
}
70 changes: 70 additions & 0 deletions pkg/pattern/drain/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package drain

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestNewLimiter(t *testing.T) {
maxPercentage := 0.5
l := newLimiter(maxPercentage)
require.NotNil(t, l, "expected non-nil limiter")
require.Equal(t, maxPercentage, l.maxPercentage, "expected maxPercentage to match")
require.Equal(t, int64(0), l.added, "expected added to be 0")
require.Equal(t, int64(0), l.evicted, "expected evicted to be 0")
require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero")
}

func TestLimiterAllow(t *testing.T) {
maxPercentage := 0.5
l := newLimiter(maxPercentage)

// Test allowing when no evictions
require.True(t, l.Allow(), "expected Allow to return true initially")

// Test allowing until evictions exceed maxPercentage
for i := 0; i < 2; i++ {
require.True(t, l.Allow(), "expected Allow to return true %d", i)
l.Evict()
}

// Evict to exceed maxPercentage
l.Evict()
require.False(t, l.Allow(), "expected Allow to return false after evictions exceed maxPercentage")

// Test blocking time
require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set")

// Fast forward time to simulate block duration passing
l.blockedUntil = time.Now().Add(-1 * time.Minute)
require.True(t, l.Allow(), "expected Allow to return true after block duration")
}

func TestLimiterEvict(t *testing.T) {
l := newLimiter(0.5)
l.Evict()
require.Equal(t, int64(1), l.evicted, "expected evicted to be 1")
l.Evict()
require.Equal(t, int64(2), l.evicted, "expected evicted to be 2")
}

func TestLimiterReset(t *testing.T) {
l := newLimiter(0.5)
l.added = 10
l.evicted = 5
l.blockedUntil = time.Now().Add(10 * time.Minute)
l.reset()
require.Equal(t, int64(0), l.added, "expected added to be 0")
require.Equal(t, int64(0), l.evicted, "expected evicted to be 0")
require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero")
}

func TestLimiterBlock(t *testing.T) {
l := newLimiter(0.5)
l.block()
require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set")
require.False(t, l.Allow())
require.True(t, l.blockedUntil.After(time.Now()), "expected blockedUntil to be in the future")
}
1 change: 1 addition & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DetectLogFormat(line string) string {

type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsPrunedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
Expand Down
14 changes: 13 additions & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/clientpool"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/metric"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -39,6 +40,8 @@ type Config struct {
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."`
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`

MetricAggregation metric.AggregationConfig `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
// For testing.
Expand All @@ -53,7 +56,9 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {

fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.")
fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -85,6 +90,7 @@ type Ingester struct {

metrics *ingesterMetrics
chunkMetrics *metric.ChunkMetrics
drainCfg *drain.Config
}

func New(
Expand All @@ -97,6 +103,10 @@ func New(
chunkMetrics := metric.NewChunkMetrics(registerer, metricsNamespace)
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)

drainCfg := drain.DefaultConfig()
drainCfg.MaxClusters = cfg.MaxClusters
drainCfg.MaxEvictionRatio = cfg.MaxEvictionRatio

i := &Ingester{
cfg: cfg,
logger: log.With(logger, "component", "pattern-ingester"),
Expand All @@ -106,6 +116,7 @@ func New(
instances: make(map[string]*instance),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
loopQuit: make(chan struct{}),
drainCfg: drainCfg,
}
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
var err error
Expand Down Expand Up @@ -357,6 +368,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
i.logger,
i.metrics,
i.chunkMetrics,
i.drainCfg,
i.cfg.MetricAggregation,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/pattern/metric"

Expand All @@ -28,6 +29,7 @@ func setup(t *testing.T) *instance {
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
metric.NewChunkMetrics(nil, "test"),
drain.DefaultConfig(),
metric.AggregationConfig{
Enabled: true,
},
Expand Down
Loading

0 comments on commit e08b4a7

Please sign in to comment.