Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add new Drain tokenizer that splits on most punctuation #13143

Merged
merged 6 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/logcli/output/loki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

89 changes: 55 additions & 34 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package drain
import (
"math"
"strconv"
"strings"
"unicode"
"unsafe"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -139,7 +141,7 @@ func DefaultConfig() *Config {
// MaxClusterDepth and SimTh, the less the chance that there will be
// "similar" clusters, but the greater the footprint.
SimTh: 0.3,
MaxChildren: 100,
MaxChildren: 15,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that better ?

ParamString: `<_>`,
MaxClusters: 300,
}
Expand All @@ -156,22 +158,24 @@ func New(config *Config, metrics *Metrics) *Drain {
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: splittingTokenizer{}, // Default to this for now
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: newPunctuationTokenizer(),
maxAllowedLineLength: 3000,
}
return d
}

type Drain struct {
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
maxAllowedLineLength int
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -183,10 +187,14 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
return d.train(d.tokenizer.Tokenize(content), d.tokenizer.Join, ts)
if len(content) > d.maxAllowedLineLength {
return nil
}
tokens, state := d.tokenizer.Tokenize(content)
return d.train(tokens, state, ts)
}

func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
return nil
}
Expand All @@ -196,11 +204,12 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
d.clustersCounter++
clusterID := d.clustersCounter
matchCluster = &LogCluster{
Tokens: tokens,
id: clusterID,
Size: 1,
Stringer: stringer,
Chunks: Chunks{},
Tokens: tokens,
TokenState: state,
id: clusterID,
Size: 1,
Stringer: d.tokenizer.Join,
Chunks: Chunks{},
}
matchCluster.append(model.TimeFromUnixNano(ts))
d.idToCluster.Set(clusterID, matchCluster)
Expand All @@ -219,15 +228,16 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
tokens, state := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
clusterID := d.clustersCounter
matchCluster = &LogCluster{
Tokens: tokens,
id: clusterID,
Tokens: tokens,
TokenState: state,
id: clusterID,
}
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
Expand All @@ -241,24 +251,33 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample)
return matchCluster
}

func deduplicatePlaceholders(tokens []string, param string) []string {
if len(tokens) < 2 {
return tokens
func deduplicatePlaceholders(line string, placeholder string) string {
first := strings.Index(line, "<_><_>")
if first == -1 {
return line
}
i := 1
for k := 1; k < len(tokens); k++ {
if tokens[k] != param || tokens[k] != tokens[k-1] {
if i != k {
tokens[i] = tokens[k]
builder := make([]byte, 0, len(line))
low := 0
for i := first; i < len(line)-5; i++ {
if line[i:i+len(placeholder)] == placeholder {
high := i + 3
for ; high < len(line)-2; high += 3 {
if line[high:high+len(placeholder)] != placeholder {
break
}
}
i++
builder = append(builder, line[low:i+len(placeholder)]...)
low = high
i = high
}
}
return tokens[:i]
builder = append(builder, line[low:]...)

return unsafe.String(unsafe.SliceData(builder), len(builder))
}

func (d *Drain) PatternString(c *LogCluster) string {
s := d.tokenizer.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString))
s := deduplicatePlaceholders(d.tokenizer.Join(c.Tokens, c.TokenState), d.config.ParamString)
if s == d.config.ParamString {
return ""
}
Expand All @@ -271,7 +290,7 @@ func (d *Drain) Delete(cluster *LogCluster) {

// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications.
func (d *Drain) Match(content string) *LogCluster {
contentTokens := d.tokenizer.Tokenize(content)
contentTokens, _ := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true)
return matchCluster
}
Expand Down Expand Up @@ -413,6 +432,7 @@ func (d *Drain) addSeqToPrefixTree(rootNode *Node, cluster *LogCluster) {
// if token not matched in this layer of existing tree.
if _, ok = curNode.keyToChildNode[token]; !ok {
if !d.hasNumbers(token) {
// Numbers in token: Prioritize the param string path
if _, ok = curNode.keyToChildNode[d.config.ParamString]; ok {
if len(curNode.keyToChildNode) < d.config.MaxChildren {
newNode := createNode()
Expand All @@ -435,6 +455,7 @@ func (d *Drain) addSeqToPrefixTree(rootNode *Node, cluster *LogCluster) {
}
}
} else {
// No numbers, use the key as-is to traverse
if _, ok = curNode.keyToChildNode[d.config.ParamString]; !ok {
newNode := createNode()
curNode.keyToChildNode[d.config.ParamString] = newNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
drain := New(DefaultConfig(), nil)
for _, line := range lines {
drain := New(DefaultConfig(), nil)
drain.Train(line, 0)
}
}
Expand Down
Loading
Loading