From 1c0968a51874ab8328b76ce376d7f2dc0c1b363a Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 22 Jan 2016 11:54:12 -0700 Subject: [PATCH] Make each output keep it's own slice of points Also moved some objects out of config.go and put them in their own package, internal/models fixes #568 closes #285 --- accumulator.go | 6 +- agent.go | 93 ++++---------- internal/config/config.go | 169 +++---------------------- internal/config/config_test.go | 201 +++--------------------------- internal/models/filter.go | 92 ++++++++++++++ internal/models/filter_test.go | 177 ++++++++++++++++++++++++++ internal/models/running_input.go | 24 ++++ internal/models/running_output.go | 72 +++++++++++ 8 files changed, 425 insertions(+), 409 deletions(-) create mode 100644 internal/models/filter.go create mode 100644 internal/models/filter_test.go create mode 100644 internal/models/running_input.go create mode 100644 internal/models/running_output.go diff --git a/accumulator.go b/accumulator.go index c628907d74c32..83f61ae99f2bc 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/influxdb/client/v2" ) @@ -29,7 +29,7 @@ type Accumulator interface { } func NewAccumulator( - inputConfig *config.InputConfig, + inputConfig *models.InputConfig, points chan *client.Point, ) Accumulator { acc := accumulator{} @@ -47,7 +47,7 @@ type accumulator struct { debug bool - inputConfig *config.InputConfig + inputConfig *models.InputConfig prefix string } diff --git a/agent.go b/agent.go index 5425fba33c804..b63d0527900a5 100644 --- a/agent.go +++ b/agent.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" @@ -101,7 +102,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { wg.Add(1) counter++ - go func(input *config.RunningInput) { + go func(input *models.RunningInput) { defer wg.Done() acc := NewAccumulator(input.Config, pointChan) @@ -144,7 +145,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { // reporting interval. func (a *Agent) gatherSeparate( shutdown chan struct{}, - input *config.RunningInput, + input *models.RunningInput, pointChan chan *client.Point, ) error { ticker := time.NewTicker(input.Config.Interval) @@ -228,93 +229,45 @@ func (a *Agent) Test() error { return nil } -// writeOutput writes a list of points to a single output, with retries. -// Optionally takes a `done` channel to indicate that it is done writing. -func (a *Agent) writeOutput( - points []*client.Point, - ro *config.RunningOutput, - shutdown chan struct{}, - wg *sync.WaitGroup, -) { - defer wg.Done() - if len(points) == 0 { - return - } - retry := 0 - retries := a.Config.Agent.FlushRetries - start := time.Now() - - for { - filtered := ro.FilterPoints(points) - err := ro.Output.Write(filtered) - if err == nil { - // Write successful - elapsed := time.Since(start) - if !a.Config.Agent.Quiet { - log.Printf("Flushed %d metrics to output %s in %s\n", - len(filtered), ro.Name, elapsed) - } - return - } - - select { - case <-shutdown: - return - default: - if retry >= retries { - // No more retries - msg := "FATAL: Write to output [%s] failed %d times, dropping" + - " %d metrics\n" - log.Printf(msg, ro.Name, retries+1, len(points)) - return - } else if err != nil { - // Sleep for a retry - log.Printf("Error in output [%s]: %s, retrying in %s", - ro.Name, err.Error(), a.Config.Agent.FlushInterval.Duration) - time.Sleep(a.Config.Agent.FlushInterval.Duration) - } - } - - retry++ - } -} - // flush writes a list of points to all configured outputs -func (a *Agent) flush( - points []*client.Point, - shutdown chan struct{}, - wait bool, -) { +func (a *Agent) flush() { var wg sync.WaitGroup + + wg.Add(len(a.Config.Outputs)) for _, o := range a.Config.Outputs { - wg.Add(1) - go a.writeOutput(points, o, shutdown, &wg) - } - if wait { - wg.Wait() + go func(output *models.RunningOutput) { + defer wg.Done() + err := output.Write() + if err != nil { + log.Printf("Error writing to output [%s]: %s\n", + output.Name, err.Error()) + } + }(o) } + + wg.Wait() } // flusher monitors the points input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 200) ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) - points := make([]*client.Point, 0) for { select { case <-shutdown: log.Println("Hang on, flushing any cached points before shutdown") - a.flush(points, shutdown, true) + a.flush() return nil case <-ticker.C: - a.flush(points, shutdown, false) - points = make([]*client.Point, 0) + a.flush() case pt := <-pointChan: - points = append(points, pt) + for _, o := range a.Config.Outputs { + o.AddPoint(pt) + } } } } @@ -389,7 +342,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { // configured. Default intervals are handled below with gatherParallel if input.Config.Interval != 0 { wg.Add(1) - go func(input *config.RunningInput) { + go func(input *models.RunningInput) { defer wg.Done() if err := a.gatherSeparate(shutdown, input, pointChan); err != nil { log.Printf(err.Error()) diff --git a/internal/config/config.go b/internal/config/config.go index 3b5e4ff178bd1..669d8729ccbc5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,13 +11,12 @@ import ( "time" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/naoina/toml" "github.com/naoina/toml/ast" - - "github.com/influxdata/influxdb/client/v2" ) // Config specifies the URL/user/password for the database that telegraf @@ -29,8 +28,8 @@ type Config struct { OutputFilters []string Agent *AgentConfig - Inputs []*RunningInput - Outputs []*RunningOutput + Inputs []*models.RunningInput + Outputs []*models.RunningOutput } func NewConfig() *Config { @@ -40,13 +39,12 @@ func NewConfig() *Config { Interval: internal.Duration{Duration: 10 * time.Second}, RoundInterval: true, FlushInterval: internal.Duration{Duration: 10 * time.Second}, - FlushRetries: 2, FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, Tags: make(map[string]string), - Inputs: make([]*RunningInput, 0), - Outputs: make([]*RunningOutput, 0), + Inputs: make([]*models.RunningInput, 0), + Outputs: make([]*models.RunningOutput, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -70,9 +68,6 @@ type AgentConfig struct { // Interval at which to flush data FlushInterval internal.Duration - // FlushRetries is the number of times to retry each data flush - FlushRetries int - // FlushJitter Jitters the flush interval by a random amount. // This is primarily to avoid large write spikes for users running a large // number of telegraf instances. @@ -93,129 +88,6 @@ type AgentConfig struct { Hostname string } -// TagFilter is the name of a tag, and the values on which to filter -type TagFilter struct { - Name string - Filter []string -} - -type RunningOutput struct { - Name string - Output outputs.Output - Config *OutputConfig -} - -type RunningInput struct { - Name string - Input inputs.Input - Config *InputConfig -} - -// Filter containing drop/pass and tagdrop/tagpass rules -type Filter struct { - Drop []string - Pass []string - - TagDrop []TagFilter - TagPass []TagFilter - - IsActive bool -} - -// InputConfig containing a name, interval, and filter -type InputConfig struct { - Name string - NameOverride string - MeasurementPrefix string - MeasurementSuffix string - Tags map[string]string - Filter Filter - Interval time.Duration -} - -// OutputConfig containing name and filter -type OutputConfig struct { - Name string - Filter Filter -} - -// Filter returns filtered slice of client.Points based on whether filters -// are active for this RunningOutput. -func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point { - if !ro.Config.Filter.IsActive { - return points - } - - var filteredPoints []*client.Point - for i := range points { - if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) { - continue - } - filteredPoints = append(filteredPoints, points[i]) - } - return filteredPoints -} - -// ShouldPass returns true if the metric should pass, false if should drop -// based on the drop/pass filter parameters -func (f Filter) ShouldPass(fieldkey string) bool { - if f.Pass != nil { - for _, pat := range f.Pass { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) { - return true - } - } - return false - } - - if f.Drop != nil { - for _, pat := range f.Drop { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) { - return false - } - } - - return true - } - return true -} - -// ShouldTagsPass returns true if the metric should pass, false if should drop -// based on the tagdrop/tagpass filter parameters -func (f Filter) ShouldTagsPass(tags map[string]string) bool { - if f.TagPass != nil { - for _, pat := range f.TagPass { - if tagval, ok := tags[pat.Name]; ok { - for _, filter := range pat.Filter { - if internal.Glob(filter, tagval) { - return true - } - } - } - } - return false - } - - if f.TagDrop != nil { - for _, pat := range f.TagDrop { - if tagval, ok := tags[pat.Name]; ok { - for _, filter := range pat.Filter { - if internal.Glob(filter, tagval) { - return false - } - } - } - } - return true - } - - return true -} - // Inputs returns a list of strings of the configured inputs. func (c *Config) InputNames() []string { var name []string @@ -535,11 +407,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return err } - ro := &RunningOutput{ - Name: name, - Output: output, - Config: outputConfig, - } + ro := models.NewRunningOutput(name, output, outputConfig) + ro.Quiet = c.Agent.Quiet c.Outputs = append(c.Outputs, ro) return nil } @@ -568,7 +437,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &RunningInput{ + rp := &models.RunningInput{ Name: name, Input: input, Config: pluginConfig, @@ -578,10 +447,10 @@ func (c *Config) addInput(name string, table *ast.Table) error { } // buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to -// be inserted into the OutputConfig/InputConfig to be used for prefix +// be inserted into the models.OutputConfig/models.InputConfig to be used for prefix // filtering on tags and measurements -func buildFilter(tbl *ast.Table) Filter { - f := Filter{} +func buildFilter(tbl *ast.Table) models.Filter { + f := models.Filter{} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { @@ -613,7 +482,7 @@ func buildFilter(tbl *ast.Table) Filter { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &TagFilter{Name: name} + tagfilter := &models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { @@ -632,7 +501,7 @@ func buildFilter(tbl *ast.Table) Filter { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &TagFilter{Name: name} + tagfilter := &models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { @@ -656,9 +525,9 @@ func buildFilter(tbl *ast.Table) Filter { // buildInput parses input specific items from the ast.Table, // builds the filter and returns a -// InputConfig to be inserted into RunningInput -func buildInput(name string, tbl *ast.Table) (*InputConfig, error) { - cp := &InputConfig{Name: name} +// models.InputConfig to be inserted into models.RunningInput +func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { + cp := &models.InputConfig{Name: name} if node, ok := tbl.Fields["interval"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -715,10 +584,10 @@ func buildInput(name string, tbl *ast.Table) (*InputConfig, error) { } // buildOutput parses output specific items from the ast.Table, builds the filter and returns an -// OutputConfig to be inserted into RunningInput +// models.OutputConfig to be inserted into models.RunningInput // Note: error exists in the return for future calls that might require error -func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) { - oc := &OutputConfig{ +func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { + oc := &models.OutputConfig{ Name: name, Filter: buildFilter(tbl), } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 40af30c1e63db..92f45ad0a12ec 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/exec" "github.com/influxdata/telegraf/plugins/inputs/memcached" @@ -18,19 +19,19 @@ func TestConfig_LoadSingleInput(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &InputConfig{ + mConfig := &models.InputConfig{ Name: "memcached", - Filter: Filter{ + Filter: models.Filter{ Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ + TagDrop: []models.TagFilter{ + models.TagFilter{ Name: "badtag", Filter: []string{"othertag"}, }, }, - TagPass: []TagFilter{ - TagFilter{ + TagPass: []models.TagFilter{ + models.TagFilter{ Name: "goodtag", Filter: []string{"mytag"}, }, @@ -61,19 +62,19 @@ func TestConfig_LoadDirectory(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &InputConfig{ + mConfig := &models.InputConfig{ Name: "memcached", - Filter: Filter{ + Filter: models.Filter{ Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ + TagDrop: []models.TagFilter{ + models.TagFilter{ Name: "badtag", Filter: []string{"othertag"}, }, }, - TagPass: []TagFilter{ - TagFilter{ + TagPass: []models.TagFilter{ + models.TagFilter{ Name: "goodtag", Filter: []string{"mytag"}, }, @@ -91,7 +92,7 @@ func TestConfig_LoadDirectory(t *testing.T) { ex := inputs.Inputs["exec"]().(*exec.Exec) ex.Command = "/usr/bin/myothercollector --foo=bar" - eConfig := &InputConfig{ + eConfig := &models.InputConfig{ Name: "exec", MeasurementSuffix: "_myothercollector", } @@ -110,7 +111,7 @@ func TestConfig_LoadDirectory(t *testing.T) { pstat := inputs.Inputs["procstat"]().(*procstat.Procstat) pstat.PidFile = "/var/run/grafana-server.pid" - pConfig := &InputConfig{Name: "procstat"} + pConfig := &models.InputConfig{Name: "procstat"} pConfig.Tags = make(map[string]string) assert.Equal(t, pstat, c.Inputs[3].Input, @@ -118,175 +119,3 @@ func TestConfig_LoadDirectory(t *testing.T) { assert.Equal(t, pConfig, c.Inputs[3].Config, "Merged Testdata did not produce correct procstat metadata.") } - -func TestFilter_Empty(t *testing.T) { - f := Filter{} - - measurements := []string{ - "foo", - "bar", - "barfoo", - "foo_bar", - "foo.bar", - "foo-bar", - "supercalifradjulisticexpialidocious", - } - - for _, measurement := range measurements { - if !f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to pass", measurement) - } - } -} - -func TestFilter_Pass(t *testing.T) { - f := Filter{ - Pass: []string{"foo*", "cpu_usage_idle"}, - } - - passes := []string{ - "foo", - "foo_bar", - "foo.bar", - "foo-bar", - "cpu_usage_idle", - } - - drops := []string{ - "bar", - "barfoo", - "bar_foo", - "cpu_usage_busy", - } - - for _, measurement := range passes { - if !f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to pass", measurement) - } - } - - for _, measurement := range drops { - if f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to drop", measurement) - } - } -} - -func TestFilter_Drop(t *testing.T) { - f := Filter{ - Drop: []string{"foo*", "cpu_usage_idle"}, - } - - drops := []string{ - "foo", - "foo_bar", - "foo.bar", - "foo-bar", - "cpu_usage_idle", - } - - passes := []string{ - "bar", - "barfoo", - "bar_foo", - "cpu_usage_busy", - } - - for _, measurement := range passes { - if !f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to pass", measurement) - } - } - - for _, measurement := range drops { - if f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to drop", measurement) - } - } -} - -func TestFilter_TagPass(t *testing.T) { - filters := []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"cpu-*"}, - }, - TagFilter{ - Name: "mem", - Filter: []string{"mem_free"}, - }} - f := Filter{ - TagPass: filters, - } - - passes := []map[string]string{ - {"cpu": "cpu-total"}, - {"cpu": "cpu-0"}, - {"cpu": "cpu-1"}, - {"cpu": "cpu-2"}, - {"mem": "mem_free"}, - } - - drops := []map[string]string{ - {"cpu": "cputotal"}, - {"cpu": "cpu0"}, - {"cpu": "cpu1"}, - {"cpu": "cpu2"}, - {"mem": "mem_used"}, - } - - for _, tags := range passes { - if !f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to pass", tags) - } - } - - for _, tags := range drops { - if f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to drop", tags) - } - } -} - -func TestFilter_TagDrop(t *testing.T) { - filters := []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"cpu-*"}, - }, - TagFilter{ - Name: "mem", - Filter: []string{"mem_free"}, - }} - f := Filter{ - TagDrop: filters, - } - - drops := []map[string]string{ - {"cpu": "cpu-total"}, - {"cpu": "cpu-0"}, - {"cpu": "cpu-1"}, - {"cpu": "cpu-2"}, - {"mem": "mem_free"}, - } - - passes := []map[string]string{ - {"cpu": "cputotal"}, - {"cpu": "cpu0"}, - {"cpu": "cpu1"}, - {"cpu": "cpu2"}, - {"mem": "mem_used"}, - } - - for _, tags := range passes { - if !f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to pass", tags) - } - } - - for _, tags := range drops { - if f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to drop", tags) - } - } -} diff --git a/internal/models/filter.go b/internal/models/filter.go new file mode 100644 index 0000000000000..3f171ccaca33c --- /dev/null +++ b/internal/models/filter.go @@ -0,0 +1,92 @@ +package models + +import ( + "strings" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf/internal" +) + +// TagFilter is the name of a tag, and the values on which to filter +type TagFilter struct { + Name string + Filter []string +} + +// Filter containing drop/pass and tagdrop/tagpass rules +type Filter struct { + Drop []string + Pass []string + + TagDrop []TagFilter + TagPass []TagFilter + + IsActive bool +} + +func (f Filter) ShouldPointPass(point *client.Point) bool { + if f.ShouldPass(point.Name()) && f.ShouldTagsPass(point.Tags()) { + return true + } + return false +} + +// ShouldPass returns true if the metric should pass, false if should drop +// based on the drop/pass filter parameters +func (f Filter) ShouldPass(key string) bool { + if f.Pass != nil { + for _, pat := range f.Pass { + // TODO remove HasPrefix check, leaving it for now for legacy support. + // Cam, 2015-12-07 + if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { + return true + } + } + return false + } + + if f.Drop != nil { + for _, pat := range f.Drop { + // TODO remove HasPrefix check, leaving it for now for legacy support. + // Cam, 2015-12-07 + if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { + return false + } + } + + return true + } + return true +} + +// ShouldTagsPass returns true if the metric should pass, false if should drop +// based on the tagdrop/tagpass filter parameters +func (f Filter) ShouldTagsPass(tags map[string]string) bool { + if f.TagPass != nil { + for _, pat := range f.TagPass { + if tagval, ok := tags[pat.Name]; ok { + for _, filter := range pat.Filter { + if internal.Glob(filter, tagval) { + return true + } + } + } + } + return false + } + + if f.TagDrop != nil { + for _, pat := range f.TagDrop { + if tagval, ok := tags[pat.Name]; ok { + for _, filter := range pat.Filter { + if internal.Glob(filter, tagval) { + return false + } + } + } + } + return true + } + + return true +} diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go new file mode 100644 index 0000000000000..9e962e4200519 --- /dev/null +++ b/internal/models/filter_test.go @@ -0,0 +1,177 @@ +package models + +import ( + "testing" +) + +func TestFilter_Empty(t *testing.T) { + f := Filter{} + + measurements := []string{ + "foo", + "bar", + "barfoo", + "foo_bar", + "foo.bar", + "foo-bar", + "supercalifradjulisticexpialidocious", + } + + for _, measurement := range measurements { + if !f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } +} + +func TestFilter_Pass(t *testing.T) { + f := Filter{ + Pass: []string{"foo*", "cpu_usage_idle"}, + } + + passes := []string{ + "foo", + "foo_bar", + "foo.bar", + "foo-bar", + "cpu_usage_idle", + } + + drops := []string{ + "bar", + "barfoo", + "bar_foo", + "cpu_usage_busy", + } + + for _, measurement := range passes { + if !f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } + + for _, measurement := range drops { + if f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to drop", measurement) + } + } +} + +func TestFilter_Drop(t *testing.T) { + f := Filter{ + Drop: []string{"foo*", "cpu_usage_idle"}, + } + + drops := []string{ + "foo", + "foo_bar", + "foo.bar", + "foo-bar", + "cpu_usage_idle", + } + + passes := []string{ + "bar", + "barfoo", + "bar_foo", + "cpu_usage_busy", + } + + for _, measurement := range passes { + if !f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } + + for _, measurement := range drops { + if f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to drop", measurement) + } + } +} + +func TestFilter_TagPass(t *testing.T) { + filters := []TagFilter{ + TagFilter{ + Name: "cpu", + Filter: []string{"cpu-*"}, + }, + TagFilter{ + Name: "mem", + Filter: []string{"mem_free"}, + }} + f := Filter{ + TagPass: filters, + } + + passes := []map[string]string{ + {"cpu": "cpu-total"}, + {"cpu": "cpu-0"}, + {"cpu": "cpu-1"}, + {"cpu": "cpu-2"}, + {"mem": "mem_free"}, + } + + drops := []map[string]string{ + {"cpu": "cputotal"}, + {"cpu": "cpu0"}, + {"cpu": "cpu1"}, + {"cpu": "cpu2"}, + {"mem": "mem_used"}, + } + + for _, tags := range passes { + if !f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to pass", tags) + } + } + + for _, tags := range drops { + if f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to drop", tags) + } + } +} + +func TestFilter_TagDrop(t *testing.T) { + filters := []TagFilter{ + TagFilter{ + Name: "cpu", + Filter: []string{"cpu-*"}, + }, + TagFilter{ + Name: "mem", + Filter: []string{"mem_free"}, + }} + f := Filter{ + TagDrop: filters, + } + + drops := []map[string]string{ + {"cpu": "cpu-total"}, + {"cpu": "cpu-0"}, + {"cpu": "cpu-1"}, + {"cpu": "cpu-2"}, + {"mem": "mem_free"}, + } + + passes := []map[string]string{ + {"cpu": "cputotal"}, + {"cpu": "cpu0"}, + {"cpu": "cpu1"}, + {"cpu": "cpu2"}, + {"mem": "mem_used"}, + } + + for _, tags := range passes { + if !f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to pass", tags) + } + } + + for _, tags := range drops { + if f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to drop", tags) + } + } +} diff --git a/internal/models/running_input.go b/internal/models/running_input.go new file mode 100644 index 0000000000000..17c0d2129d17b --- /dev/null +++ b/internal/models/running_input.go @@ -0,0 +1,24 @@ +package models + +import ( + "time" + + "github.com/influxdata/telegraf/plugins/inputs" +) + +type RunningInput struct { + Name string + Input inputs.Input + Config *InputConfig +} + +// InputConfig containing a name, interval, and filter +type InputConfig struct { + Name string + NameOverride string + MeasurementPrefix string + MeasurementSuffix string + Tags map[string]string + Filter Filter + Interval time.Duration +} diff --git a/internal/models/running_output.go b/internal/models/running_output.go new file mode 100644 index 0000000000000..c597980931b82 --- /dev/null +++ b/internal/models/running_output.go @@ -0,0 +1,72 @@ +package models + +import ( + "log" + "time" + + "github.com/influxdata/telegraf/plugins/outputs" + + "github.com/influxdata/influxdb/client/v2" +) + +const POINT_BUFFER_LIMIT = 10000 + +type RunningOutput struct { + Name string + Output outputs.Output + Config *OutputConfig + Quiet bool + + points []*client.Point + overwriteCounter int +} + +func NewRunningOutput( + name string, + output outputs.Output, + conf *OutputConfig, +) *RunningOutput { + ro := &RunningOutput{ + Name: name, + points: make([]*client.Point, 0), + Output: output, + Config: conf, + } + return ro +} + +func (ro *RunningOutput) AddPoint(point *client.Point) { + if ro.Config.Filter.IsActive { + if !ro.Config.Filter.ShouldPointPass(point) { + return + } + } + + if len(ro.points) < POINT_BUFFER_LIMIT { + ro.points = append(ro.points, point) + } else { + ro.points[ro.overwriteCounter] = point + ro.overwriteCounter++ + } +} + +func (ro *RunningOutput) Write() error { + start := time.Now() + err := ro.Output.Write(ro.points) + elapsed := time.Since(start) + if err == nil { + if !ro.Quiet { + log.Printf("Wrote %d metrics to output %s in %s\n", + len(ro.points), ro.Name, elapsed) + } + ro.points = make([]*client.Point, 0) + ro.overwriteCounter = 0 + } + return err +} + +// OutputConfig containing name and filter +type OutputConfig struct { + Name string + Filter Filter +}