Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add tokenizer interface for Drain Training #13069

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package drain
import (
"math"
"strconv"
"strings"
"unicode"

"github.com/hashicorp/golang-lru/v2/simplelru"
Expand Down Expand Up @@ -161,6 +160,7 @@ func New(config *Config, metrics *Metrics) *Drain {
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: splittingTokenizer{}, // Default to this for now
}
return d
}
Expand All @@ -171,6 +171,7 @@ type Drain struct {
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -182,10 +183,13 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
return d.train(d.getContentAsTokens(content), nil, ts)
return d.train(d.tokenizer.Tokenize(content), d.tokenizer.Join, ts)
}

func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
if len(tokens) < 4 {
return nil
}
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand Down Expand Up @@ -215,7 +219,7 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := tokenizePattern(content, d.config.ParamString)
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand All @@ -237,10 +241,6 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample)
return matchCluster
}

func tokenizePattern(content, param string) []string {
return deduplicatePlaceholders(strings.Split(content, " "), param)
}

func deduplicatePlaceholders(tokens []string, param string) []string {
if len(tokens) < 2 {
return tokens
Expand All @@ -258,7 +258,7 @@ func deduplicatePlaceholders(tokens []string, param string) []string {
}

func (d *Drain) PatternString(c *LogCluster) string {
s := strings.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString), " ")
s := d.tokenizer.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString))
if s == d.config.ParamString {
return ""
}
Expand All @@ -271,18 +271,11 @@ func (d *Drain) Delete(cluster *LogCluster) {

// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications.
func (d *Drain) Match(content string) *LogCluster {
contentTokens := d.getContentAsTokens(content)
contentTokens := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true)
return matchCluster
}

func (d *Drain) getContentAsTokens(content string) []string {
for _, extraDelimiter := range d.config.ExtraDelimiters {
content = strings.Replace(content, extraDelimiter, " ", -1)
}
return strings.Split(content, " ")
}

func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster {
tokenCount := len(tokens)

Expand Down
49 changes: 10 additions & 39 deletions pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,21 @@ import (

func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
tests := []struct {
name string
drain *Drain
inputFile string
}{
{
name: `Patterns for agent logfmt logs`,
inputFile: `testdata/agent-logfmt.txt`,
},
{
name: `Patterns for ingester logfmt logs`,
inputFile: `testdata/ingester-logfmt.txt`,
},
{
name: `Patterns for Drone json logs`,
inputFile: `testdata/drone-json.txt`,
},
{
name: "Patterns for distributor logfmt logs",
inputFile: "testdata/distributor-logfmt.txt",
},
{
name: "Patterns for journald logs",
inputFile: "testdata/journald.txt",
},
{
name: "Patterns for kafka logs",
inputFile: "testdata/kafka.txt",
},
{
name: "Patterns for kubernetes logs",
inputFile: "testdata/kubernetes.txt",
},
{
name: "Patterns for vault logs",
inputFile: "testdata/vault.txt",
},
{
name: "Patterns for calico logs",
inputFile: "testdata/calico.txt",
},
{inputFile: `testdata/agent-logfmt.txt`},
{inputFile: `testdata/ingester-logfmt.txt`},
{inputFile: `testdata/drone-json.txt`},
{inputFile: "testdata/distributor-logfmt.txt"},
{inputFile: "testdata/journald.txt"},
{inputFile: "testdata/kafka.txt"},
{inputFile: "testdata/kubernetes.txt"},
{inputFile: "testdata/vault.txt"},
{inputFile: "testdata/calico.txt"},
}

for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.Run(tt.inputFile, func(b *testing.B) {
file, err := os.Open(tt.inputFile)
require.NoError(b, err)
defer file.Close()
Expand Down
Loading
Loading