diff --git a/128tech.md b/128tech.md index 0ae65af589998..bfff36a7f9991 100644 --- a/128tech.md +++ b/128tech.md @@ -2,31 +2,24 @@ This README will describe how to modify and build Telegraf for telegraf-128tech. -## Fetching Dependencies +## Pulling in Upstream Changes -At some point, all of the Telegraf dependencies will need to be pulled down. This will be done during build automatically unless you've already done so and use the flag to skip that step (see "Building a New RPM"). _It's nice to do this manually because there's poor visibility into the build step_. +When updating an existing version, the tasks are straight forward. -If you try to run commands without having the dependencies downloaded, you will see errors of the following form. +1. Merge the upstream branch +2. Cherry-pick desired changes that exist on some other upstream branch (master for example) -``` -internal/internal.go:24:2: cannot find package "github.com/alecthomas/units" in any of: - /usr/local/go/src/github.com/alecthomas/units (from $GOROOT) - /go/src/github.com/alecthomas/units (from $GOPATH) -``` - -To fetch dependencies directly, you can do it simply from the shell. See "Using the Shell" for how to get into it. From the shell's default directory, simply run: +## Moving to a New Upstream Version -``` -dep ensure --vendor-only -v -``` +When moving to a new upstream version, things are a little more complicated. It requires identification of what has been added to our custom telegraf version which must be pulled into the new release branch. This can be done as described in [this little article](https://til.hashrocket.com/posts/18139f4f20-list-different-commits-between-two-branches). -The above command provides the best visibility. The technically sanctioned fetch step is: +First, pull down the new upstream branch. Then, determine what's been added locally and needs to be included in the new custom build. Do this by finding the commits that were added in the custom branch. This example uses release 1.14, but that will change as time passes. ``` -make deps +git log --no-merges --left-right --graph --cherry-pick --oneline release-1.14..release-128tech-1.14 ``` -It does take some time to complete. After that, the dependencies exist in the `vendor` folder and don't need to be fetched again. +That should provide a limited number of commits that will need to be cherry-picked from the original custom branch to the new one. It is possible these would already exist in the new upstream branch if they were back ported to the custom branch. ## Building a New RPM diff --git a/CHANGELOG.md b/CHANGELOG.md index 9338fe5f0e653..42ed9115ef250 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,19 @@ -## v1.14.3 [unreleased] +## v1.14.4 [unreleased] + +#### Bugfixes + +- [#7325](https://github.com/influxdata/telegraf/issues/7325): Fix "cannot insert the value NULL error" with PerformanceCounters query. +- [#7579](https://github.com/influxdata/telegraf/pull/7579): Fix numeric to bool conversion in converter processor. +- [#7551](https://github.com/influxdata/telegraf/issues/7551): Fix typo in name of gc_cpu_fraction field of the influxdb input. + +## v1.14.3 [2020-05-19] #### Bugfixes - [#7412](https://github.com/influxdata/telegraf/pull/7412): Use same timestamp for all objects in arrays in the json parser. +- [#7343](https://github.com/influxdata/telegraf/issues/7343): Handle multiple metrics with the same timestamp in dedup processor. +- [#5905](https://github.com/influxdata/telegraf/issues/5905): Fix reconnection of timed out HTTP2 connections influxdb outputs. +- [#7468](https://github.com/influxdata/telegraf/issues/7468): Fix negative value parsing in impi_sensor input. ## v1.14.2 [2020-04-28] diff --git a/internal/http.go b/internal/http.go index a44506719e4fc..04b8a9368906e 100644 --- a/internal/http.go +++ b/internal/http.go @@ -4,6 +4,7 @@ import ( "crypto/subtle" "net" "net/http" + "net/url" ) type BasicAuthErrorFunc func(rw http.ResponseWriter) @@ -95,3 +96,13 @@ func (h *ipRangeHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { h.onError(rw, http.StatusForbidden) } + +func OnClientError(client *http.Client, err error) { + // Close connection after a timeout error. If this is a HTTP2 + // connection this ensures that next interval a new connection will be + // used and name lookup will be performed. + // https://github.com/golang/go/issues/36026 + if err, ok := err.(*url.Error); ok && err.Timeout() { + client.CloseIdleConnections() + } +} diff --git a/internal/http_go1.11.go b/internal/http_go1.11.go deleted file mode 100644 index d1a1ae31adb91..0000000000000 --- a/internal/http_go1.11.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build !go1.12 - -package internal - -import "net/http" - -func CloseIdleConnections(c *http.Client) { - type closeIdler interface { - CloseIdleConnections() - } - - if tr, ok := c.Transport.(closeIdler); ok { - tr.CloseIdleConnections() - } -} diff --git a/internal/http_go1.12.go b/internal/http_go1.12.go deleted file mode 100644 index d5b1a847f12a7..0000000000000 --- a/internal/http_go1.12.go +++ /dev/null @@ -1,9 +0,0 @@ -// +build go1.12 - -package internal - -import "net/http" - -func CloseIdleConnections(c *http.Client) { - c.CloseIdleConnections() -} diff --git a/plugins/inputs/influxdb/README.md b/plugins/inputs/influxdb/README.md index 711503245bc1c..8787c6a0e6dff 100644 --- a/plugins/inputs/influxdb/README.md +++ b/plugins/inputs/influxdb/README.md @@ -59,7 +59,7 @@ and may vary between versions. - heap_sys - mcache_sys - next_gc - - gcc_pu_fraction + - gc_cpu_fraction - other_sys - alloc - stack_inuse @@ -95,7 +95,7 @@ telegraf --config ~/ws/telegraf.conf --input-filter influxdb --test > influxdb_measurement,database=_internal,host=tyrion,measurement=tsm1_filestore,url=http://localhost:8086/debug/vars numSeries=2 1463590500247354636 > influxdb_measurement,database=_internal,host=tyrion,measurement=tsm1_wal,url=http://localhost:8086/debug/vars numSeries=4 1463590500247354636 > influxdb_measurement,database=_internal,host=tyrion,measurement=write,url=http://localhost:8086/debug/vars numSeries=1 1463590500247354636 -> influxdb_memstats,host=tyrion,url=http://localhost:8086/debug/vars alloc=7642384i,buck_hash_sys=1463471i,frees=1169558i,gc_sys=653312i,gcc_pu_fraction=0.00003825652361068311,heap_alloc=7642384i,heap_idle=9912320i,heap_inuse=9125888i,heap_objects=48276i,heap_released=0i,heap_sys=19038208i,last_gc=1463590480877651621i,lookups=90i,mallocs=1217834i,mcache_inuse=4800i,mcache_sys=16384i,mspan_inuse=70920i,mspan_sys=81920i,next_gc=11679787i,num_gc=141i,other_sys=1244233i,pause_total_ns=24034027i,stack_inuse=884736i,stack_sys=884736i,sys=23382264i,total_alloc=679012200i 1463590500277918755 +> influxdb_memstats,host=tyrion,url=http://localhost:8086/debug/vars alloc=7642384i,buck_hash_sys=1463471i,frees=1169558i,gc_sys=653312i,gc_cpu_fraction=0.00003825652361068311,heap_alloc=7642384i,heap_idle=9912320i,heap_inuse=9125888i,heap_objects=48276i,heap_released=0i,heap_sys=19038208i,last_gc=1463590480877651621i,lookups=90i,mallocs=1217834i,mcache_inuse=4800i,mcache_sys=16384i,mspan_inuse=70920i,mspan_sys=81920i,next_gc=11679787i,num_gc=141i,other_sys=1244233i,pause_total_ns=24034027i,stack_inuse=884736i,stack_sys=884736i,sys=23382264i,total_alloc=679012200i 1463590500277918755 > influxdb_shard,database=_internal,engine=tsm1,host=tyrion,id=4,path=/Users/sparrc/.influxdb/data/_internal/monitor/4,retentionPolicy=monitor,url=http://localhost:8086/debug/vars fieldsCreate=65,seriesCreate=26,writePointsOk=7274,writeReq=280 1463590500247354636 > influxdb_subscriber,host=tyrion,url=http://localhost:8086/debug/vars pointsWritten=7274 1463590500247354636 > influxdb_tsm1_cache,database=_internal,host=tyrion,path=/Users/sparrc/.influxdb/data/_internal/monitor/1,retentionPolicy=monitor,url=http://localhost:8086/debug/vars WALCompactionTimeMs=0,cacheAgeMs=2809192,cachedBytes=0,diskBytes=0,memBytes=0,snapshotCount=0 1463590500247354636 diff --git a/plugins/inputs/influxdb/influxdb.go b/plugins/inputs/influxdb/influxdb.go index 96389a0138b35..23fa9fdc4eada 100644 --- a/plugins/inputs/influxdb/influxdb.go +++ b/plugins/inputs/influxdb/influxdb.go @@ -242,7 +242,7 @@ func (i *InfluxDB) gatherURL( "pause_total_ns": m.PauseTotalNs, "pause_ns": m.PauseNs[(m.NumGC+255)%256], "num_gc": m.NumGC, - "gcc_pu_fraction": m.GCCPUFraction, + "gc_cpu_fraction": m.GCCPUFraction, }, map[string]string{ "url": url, diff --git a/plugins/inputs/influxdb/influxdb_test.go b/plugins/inputs/influxdb/influxdb_test.go index 9225c45b09d68..27ea81b6d7dd6 100644 --- a/plugins/inputs/influxdb/influxdb_test.go +++ b/plugins/inputs/influxdb/influxdb_test.go @@ -92,7 +92,7 @@ func TestInfluxDB(t *testing.T) { "heap_sys": int64(33849344), "mcache_sys": int64(16384), "next_gc": int64(20843042), - "gcc_pu_fraction": float64(4.287178819113636e-05), + "gc_cpu_fraction": float64(4.287178819113636e-05), "other_sys": int64(1229737), "alloc": int64(17034016), "stack_inuse": int64(753664), diff --git a/plugins/inputs/ipmi_sensor/ipmi.go b/plugins/inputs/ipmi_sensor/ipmi.go index 9ac842b896ac2..fb53e1bc746fe 100644 --- a/plugins/inputs/ipmi_sensor/ipmi.go +++ b/plugins/inputs/ipmi_sensor/ipmi.go @@ -21,7 +21,7 @@ var ( execCommand = exec.Command // execCommand is used to mock commands in tests. re_v1_parse_line = regexp.MustCompile(`^(?P[^|]*)\|(?P[^|]*)\|(?P.*)`) re_v2_parse_line = regexp.MustCompile(`^(?P[^|]*)\|[^|]+\|(?P[^|]*)\|(?P[^|]*)\|(?:(?P[^|]+))?`) - re_v2_parse_description = regexp.MustCompile(`^(?P[0-9.]+)\s(?P.*)|(?P.+)|^$`) + re_v2_parse_description = regexp.MustCompile(`^(?P-?[0-9.]+)\s(?P.*)|(?P.+)|^$`) re_v2_parse_unit = regexp.MustCompile(`^(?P[^,]+)(?:,\s*(?P.*))?`) ) diff --git a/plugins/inputs/ipmi_sensor/ipmi_test.go b/plugins/inputs/ipmi_sensor/ipmi_test.go index 9d448435d31ef..bd5e02c196e76 100644 --- a/plugins/inputs/ipmi_sensor/ipmi_test.go +++ b/plugins/inputs/ipmi_sensor/ipmi_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -664,11 +665,10 @@ func Test_parseV2(t *testing.T) { measuredAt time.Time } tests := []struct { - name string - args args - wantFields map[string]interface{} - wantTags map[string]string - wantErr bool + name string + args args + expected []telegraf.Metric + wantErr bool }{ { name: "Test correct V2 parsing with analog value with unit", @@ -677,14 +677,19 @@ func Test_parseV2(t *testing.T) { cmdOut: []byte("Power Supply 1 | 03h | ok | 10.1 | 110 Watts, Presence detected"), measuredAt: time.Now(), }, - wantFields: map[string]interface{}{"value": float64(110)}, - wantTags: map[string]string{ - "name": "power_supply_1", - "status_code": "ok", - "server": "host", - "entity_id": "10.1", - "unit": "watts", - "status_desc": "presence_detected", + expected: []telegraf.Metric{ + testutil.MustMetric("ipmi_sensor", + map[string]string{ + "name": "power_supply_1", + "status_code": "ok", + "server": "host", + "entity_id": "10.1", + "unit": "watts", + "status_desc": "presence_detected", + }, + map[string]interface{}{"value": 110.0}, + time.Unix(0, 0), + ), }, wantErr: false, }, @@ -695,26 +700,51 @@ func Test_parseV2(t *testing.T) { cmdOut: []byte("Intrusion | 73h | ok | 7.1 |"), measuredAt: time.Now(), }, - wantFields: map[string]interface{}{"value": float64(0)}, - wantTags: map[string]string{ - "name": "intrusion", - "status_code": "ok", - "server": "host", - "entity_id": "7.1", - "status_desc": "ok", + expected: []telegraf.Metric{ + testutil.MustMetric("ipmi_sensor", + map[string]string{ + "name": "intrusion", + "status_code": "ok", + "server": "host", + "entity_id": "7.1", + "status_desc": "ok", + }, + map[string]interface{}{"value": 0.0}, + time.Unix(0, 0), + ), + }, + wantErr: false, + }, + { + name: "parse negative value", + args: args{ + hostname: "host", + cmdOut: []byte("DIMM Thrm Mrgn 1 | B0h | ok | 8.1 | -55 degrees C"), + measuredAt: time.Now(), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("ipmi_sensor", + map[string]string{ + "name": "dimm_thrm_mrgn_1", + "status_code": "ok", + "server": "host", + "entity_id": "8.1", + "unit": "degrees_c", + }, + map[string]interface{}{"value": -55.0}, + time.Unix(0, 0), + ), }, wantErr: false, }, } for _, tt := range tests { - var acc testutil.Accumulator - t.Run(tt.name, func(t *testing.T) { + var acc testutil.Accumulator if err := parseV2(&acc, tt.args.hostname, tt.args.cmdOut, tt.args.measuredAt); (err != nil) != tt.wantErr { t.Errorf("parseV2() error = %v, wantErr %v", err, tt.wantErr) } + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) }) - - acc.AssertContainsTaggedFields(t, "ipmi_sensor", tt.wantFields, tt.wantTags) } } diff --git a/plugins/inputs/kapacitor/README.md b/plugins/inputs/kapacitor/README.md index 2328e0904c118..ace4f18ff7f14 100644 --- a/plugins/inputs/kapacitor/README.md +++ b/plugins/inputs/kapacitor/README.md @@ -33,7 +33,7 @@ The Kapacitor plugin collects metrics from the given Kapacitor instances. - [notification_dropped](#notification_dropped) _(integer)_ - [primary-handle-count](#primary-handle-count) _(integer)_ - [secondary-handle-count](#secondary-handle-count) _(integer)_ -- (Kapacitor Enterprise only) [kapacitor_cluster](#kapacitor_cluster) _(integer)_ +- (Kapacitor Enterprise only) [kapacitor_cluster](#kapacitor_cluster) _(integer)_ - [dropped_member_events](#dropped_member_events) _(integer)_ - [dropped_user_events](#dropped_user_events) _(integer)_ - [query_handler_errors](#query_handler_errors) _(integer)_ @@ -49,7 +49,7 @@ The Kapacitor plugin collects metrics from the given Kapacitor instances. - [buck_hash_sys_bytes](#buck_hash_sys_bytes) _(integer)_ - [frees](#frees) _(integer)_ - [gc_sys_bytes](#gc_sys_bytes) _(integer)_ - - [gc_cpu_fraction](#gcc_pu_fraction) _(float)_ + - [gc_cpu_fraction](#gc_cpu_fraction) _(float)_ - [heap_alloc_bytes](#heap_alloc_bytes) _(integer)_ - [heap_idle_bytes](#heap_idle_bytes) _(integer)_ - [heap_in_use_bytes](#heap_in_use_bytes) _(integer)_ @@ -109,8 +109,8 @@ The `kapacitor_alert` measurement stores fields with information related to [Kapacitor alerts](https://docs.influxdata.com/kapacitor/v1.5/working/alerts/). #### notification-dropped -The number of internal notifications dropped because they arrive too late from another Kapacitor node. -If this count is increasing, Kapacitor Enterprise nodes aren't able to communicate fast enough +The number of internal notifications dropped because they arrive too late from another Kapacitor node. +If this count is increasing, Kapacitor Enterprise nodes aren't able to communicate fast enough to keep up with the volume of alerts. #### primary-handle-count @@ -199,7 +199,7 @@ The number of allocated objects. The number of heap bytes released to the operating system. #### heap_sys_bytes -The number of heap bytes obtained from `system`. +The number of heap bytes obtained from `system`. #### last_gc_ns The nanosecond epoch time of the last garbage collection. @@ -293,7 +293,7 @@ The `kapacitor_topics` measurement stores fields related to Kapacitor topics](https://docs.influxdata.com/kapacitor/latest/working/using_alert_topics/). #### collected -The number of events collected by Kapacitor topics. +The number of events collected by Kapacitor topics. --- diff --git a/plugins/inputs/sqlserver/sqlserver.go b/plugins/inputs/sqlserver/sqlserver.go index a002f9f44ba8a..7d592d498b5eb 100644 --- a/plugins/inputs/sqlserver/sqlserver.go +++ b/plugins/inputs/sqlserver/sqlserver.go @@ -617,10 +617,10 @@ SET @SQL = N'SELECT DISTINCT OR RTRIM(spi.object_name) LIKE ''%:Advanced Analytics'') AND TRY_CONVERT(uniqueidentifier, spi.instance_name) IS NOT NULL -- for cloud only - THEN d.name - WHEN RTRIM(object_name) LIKE ''%:Availability Replica'' + THEN ISNULL(d.name,RTRIM(spi.instance_name)) -- Elastic Pools counters exist for all databases but sys.databases only has current DB value + WHEN RTRIM(object_name) LIKE ''%:Availability Replica'' AND TRY_CONVERT(uniqueidentifier, spi.instance_name) IS NOT NULL -- for cloud only - THEN d.name + RTRIM(SUBSTRING(spi.instance_name, 37, LEN(spi.instance_name))) + THEN ISNULL(d.name,RTRIM(spi.instance_name)) + RTRIM(SUBSTRING(spi.instance_name, 37, LEN(spi.instance_name))) ELSE RTRIM(spi.instance_name) END AS instance_name,' ELSE 'RTRIM(spi.instance_name) as instance_name, ' diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index dcdb59b2f7d1f..2dfaafd2174e9 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -209,6 +209,7 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error resp, err := c.client.Do(req.WithContext(ctx)) if err != nil { + internal.OnClientError(c.client, err) return err } defer resp.Body.Close() @@ -311,7 +312,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error { - url, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency) + loc, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency) if err != nil { return err } @@ -322,13 +323,14 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te } defer reader.Close() - req, err := c.makeWriteRequest(url, reader) + req, err := c.makeWriteRequest(loc, reader) if err != nil { return err } resp, err := c.client.Do(req.WithContext(ctx)) if err != nil { + internal.OnClientError(c.client, err) return err } defer resp.Body.Close() @@ -505,5 +507,5 @@ func makeQueryURL(loc *url.URL) (string, error) { } func (c *httpClient) Close() { - internal.CloseIdleConnections(c.client) + c.client.CloseIdleConnections() } diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index b94df889bfc6e..36366d765bd57 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -210,7 +210,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { - url, err := makeWriteURL(*c.url, c.Organization, bucket) + loc, err := makeWriteURL(*c.url, c.Organization, bucket) if err != nil { return err } @@ -221,13 +221,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te } defer reader.Close() - req, err := c.makeWriteRequest(url, reader) + req, err := c.makeWriteRequest(loc, reader) if err != nil { return err } resp, err := c.client.Do(req.WithContext(ctx)) if err != nil { + internal.OnClientError(c.client, err) return err } defer resp.Body.Close() @@ -347,5 +348,5 @@ func makeWriteURL(loc url.URL, org, bucket string) (string, error) { } func (c *httpClient) Close() { - internal.CloseIdleConnections(c.client) + c.client.CloseIdleConnections() } diff --git a/plugins/parsers/influx/machine.go b/plugins/parsers/influx/machine.go index 59bd232dd6dd4..332b73592486e 100644 --- a/plugins/parsers/influx/machine.go +++ b/plugins/parsers/influx/machine.go @@ -31678,6 +31678,16 @@ func (m *streamMachine) Next() error { m.machine.data = expanded } + err := m.machine.exec() + if err != nil { + return err + } + + // If we have successfully parsed a full metric line break out + if m.machine.finishMetric { + break + } + n, err := m.reader.Read(m.machine.data[m.machine.pe:]) if n == 0 && err == io.EOF { m.machine.eof = m.machine.pe @@ -31692,16 +31702,6 @@ func (m *streamMachine) Next() error { m.machine.pe += n - err = m.machine.exec() - if err != nil { - return err - } - - // If we have successfully parsed a full metric line break out - if m.machine.finishMetric { - break - } - } return nil diff --git a/plugins/parsers/influx/machine.go.rl b/plugins/parsers/influx/machine.go.rl index 61f49c652a327..f8f40cd7c1dc0 100644 --- a/plugins/parsers/influx/machine.go.rl +++ b/plugins/parsers/influx/machine.go.rl @@ -506,6 +506,16 @@ func (m *streamMachine) Next() error { m.machine.data = expanded } + err := m.machine.exec() + if err != nil { + return err + } + + // If we have successfully parsed a full metric line break out + if m.machine.finishMetric { + break + } + n, err := m.reader.Read(m.machine.data[m.machine.pe:]) if n == 0 && err == io.EOF { m.machine.eof = m.machine.pe @@ -520,16 +530,6 @@ func (m *streamMachine) Next() error { m.machine.pe += n - err = m.machine.exec() - if err != nil { - return err - } - - // If we have successfully parsed a full metric line break out - if m.machine.finishMetric { - break - } - } return nil diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index 3104c1f3f6902..1ef9cecc89540 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -3,6 +3,7 @@ package influx import ( "bytes" "errors" + "io" "strconv" "strings" "testing" @@ -895,3 +896,19 @@ func TestStreamParserReaderError(t *testing.T) { _, err = parser.Next() require.Equal(t, err, EOF) } + +func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { + r, w := io.Pipe() + + parser := NewStreamParser(r) + parser.SetTimeFunc(DefaultTime) + + go w.Write([]byte("metric value=1\nmetric2 value=1\n")) + + _, err := parser.Next() + require.NoError(t, err) + + // should not block on second read + _, err = parser.Next() + require.NoError(t, err) +} diff --git a/plugins/processors/converter/converter.go b/plugins/processors/converter/converter.go index 33f2e43c0365f..55a2a2d0965dc 100644 --- a/plugins/processors/converter/converter.go +++ b/plugins/processors/converter/converter.go @@ -327,12 +327,12 @@ func (p *Converter) convertFields(metric telegraf.Metric) { func toBool(v interface{}) (bool, bool) { switch value := v.(type) { - case int64, uint64, float64: - if value != 0 { - return true, true - } else { - return false, false - } + case int64: + return value != 0, true + case uint64: + return value != 0, true + case float64: + return value != 0, true case bool: return value, true case string: diff --git a/plugins/processors/converter/converter_test.go b/plugins/processors/converter/converter_test.go index 1310e698a4d82..efde0bcd9e21f 100644 --- a/plugins/processors/converter/converter_test.go +++ b/plugins/processors/converter/converter_test.go @@ -180,7 +180,7 @@ func TestConverter(t *testing.T) { String: []string{"a"}, Integer: []string{"b"}, Unsigned: []string{"c", "negative_uint"}, - Boolean: []string{"d"}, + Boolean: []string{"d", "bool_zero"}, Float: []string{"e"}, Tag: []string{"f"}, }, @@ -196,6 +196,7 @@ func TestConverter(t *testing.T) { "e": int64(42), "f": int64(42), "negative_uint": int64(-42), + "bool_zero": int64(0), }, time.Unix(0, 0), ), @@ -212,6 +213,7 @@ func TestConverter(t *testing.T) { "d": true, "e": 42.0, "negative_uint": uint64(0), + "bool_zero": false, }, time.Unix(0, 0), ), @@ -224,7 +226,7 @@ func TestConverter(t *testing.T) { String: []string{"a"}, Integer: []string{"b", "overflow_int"}, Unsigned: []string{"c"}, - Boolean: []string{"d"}, + Boolean: []string{"d", "bool_zero"}, Float: []string{"e"}, Tag: []string{"f"}, }, @@ -240,6 +242,7 @@ func TestConverter(t *testing.T) { "e": uint64(42), "f": uint64(42), "overflow_int": uint64(math.MaxUint64), + "bool_zero": uint64(0), }, time.Unix(0, 0), ), @@ -256,6 +259,7 @@ func TestConverter(t *testing.T) { "d": true, "e": 42.0, "overflow_int": int64(math.MaxInt64), + "bool_zero": false, }, time.Unix(0, 0), ), @@ -350,7 +354,7 @@ func TestConverter(t *testing.T) { String: []string{"a"}, Integer: []string{"b", "too_large_int", "too_small_int"}, Unsigned: []string{"c", "negative_uint", "too_large_uint", "too_small_uint"}, - Boolean: []string{"d"}, + Boolean: []string{"d", "bool_zero"}, Float: []string{"e"}, Tag: []string{"f"}, }, @@ -370,6 +374,7 @@ func TestConverter(t *testing.T) { "too_small_int": -math.MaxFloat64, "too_small_uint": -math.MaxFloat64, "negative_uint": -42.0, + "bool_zero": 0.0, }, time.Unix(0, 0), ), @@ -390,6 +395,7 @@ func TestConverter(t *testing.T) { "too_small_int": int64(math.MinInt64), "too_small_uint": uint64(0), "negative_uint": uint64(0), + "bool_zero": false, }, time.Unix(0, 0), ), diff --git a/plugins/processors/dedup/dedup.go b/plugins/processors/dedup/dedup.go index c0d40f434b962..3dd7516a696c2 100644 --- a/plugins/processors/dedup/dedup.go +++ b/plugins/processors/dedup/dedup.go @@ -75,10 +75,29 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { // For each field compare value with the cached one changed := false + added := false + sametime := metric.Time() == m.Time() for _, f := range metric.FieldList() { - if value, ok := m.GetField(f.Key); ok && value != f.Value { - changed = true - continue + if value, ok := m.GetField(f.Key); ok { + if value != f.Value { + changed = true + break + } + } else if sametime { + // This field isn't in the cached metric but it's the + // same series and timestamp. Merge it into the cached + // metric. + + // Metrics have a ValueType that applies to all values + // in the metric. If an input needs to produce values + // with different ValueTypes but the same timestamp, + // they have to produce multiple metrics. (See the + // system input for an example.) In this case, dedup + // ignores the ValueTypes of the metrics and merges + // the fields into one metric for the dup check. + + m.AddField(f.Key, f.Value) + added = true } } // If any field value has changed then refresh the cache @@ -87,6 +106,10 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { continue } + if sametime && added { + continue + } + // In any other case remove metric from the output metrics = remove(metrics, idx) } diff --git a/plugins/processors/dedup/dedup_test.go b/plugins/processors/dedup/dedup_test.go index 20a94ed30a40a..cae2bf1a529ed 100644 --- a/plugins/processors/dedup/dedup_test.go +++ b/plugins/processors/dedup/dedup_test.go @@ -1,10 +1,11 @@ package dedup import ( - "github.com/stretchr/testify/require" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" @@ -152,3 +153,42 @@ func TestCacheShrink(t *testing.T) { require.Equal(t, 0, len(deduplicate.Cache)) } + +func TestSameTimestamp(t *testing.T) { + now := time.Now() + dedup := createDedup(now) + var in telegraf.Metric + var out []telegraf.Metric + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 1}, // field + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{in}, out) // pass + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"bar": 1}, // different field + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{in}, out) // pass + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"bar": 2}, // same field different value + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{in}, out) // pass + + in, _ = metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"bar": 2}, // same field same value + now, + ) + out = dedup.Apply(in) + require.Equal(t, []telegraf.Metric{}, out) // drop +}