Skip to content

Commit

Permalink
Add influx uint support as a runtime option (#3948)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Mar 29, 2018
1 parent c2108fc commit b99cd14
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 84 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ deps:
gdm restore

telegraf:
go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" $(BUILDFLAGS) ./cmd/telegraf/telegraf.go
go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" ./cmd/telegraf/telegraf.go

go-install:
go install -ldflags "-w -s $(LDFLAGS)" ./cmd/telegraf
Expand All @@ -62,9 +62,6 @@ fmtcheck:
fi
@echo '[INFO] done.'

uint64:
BUILDFLAGS="-tags uint64" $(MAKE) all

lint:
golint ./...

Expand Down
28 changes: 22 additions & 6 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

Telegraf is able to serialize metrics into the following output data formats:

1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#influx)
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#json)
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite)
1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json)
1. [Graphite](#graphite)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point),
are a combination of four basic parts:

1. Measurement Name
Expand Down Expand Up @@ -49,8 +49,10 @@ I'll go over below.

# Influx:

There are no additional configuration options for InfluxDB line-protocol. The
metrics are serialized directly into InfluxDB line-protocol.
The `influx` format outputs data as
[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/).
This is the recommended format to use unless another format is required for
interoperability.

### Influx Configuration:

Expand All @@ -64,6 +66,20 @@ metrics are serialized directly into InfluxDB line-protocol.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"

## Maximum line length in bytes. Useful only for debugging.
# influx_max_line_bytes = 0

## When true, fields will be output in ascending lexical order. Enabling
## this option will result in decreased performance and is only recommended
## when you need predictable ordering while debugging.
# influx_sort_fields = false

## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: `42u`. You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
```

# Graphite:
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["influx_uint_support"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
c.InfluxUintSupport, err = b.Boolean()
if err != nil {
return nil, err
}
}
}
}

if node, ok := tbl.Fields["json_timestamp_units"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand All @@ -1409,6 +1421,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error

delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support")
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
Expand Down
31 changes: 4 additions & 27 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,6 @@ import (
"github.com/influxdata/telegraf"
)

const MaxInt = int(^uint(0) >> 1)

// enableUint64Support will enable uint64 support if set to true.
var enableUint64Support = false

// EnableUintSupport manually enables uint support for convertValue.
// This function will be removed in the future and only exists for unit tests during the
// transition.
func EnableUintSupport() {
enableUint64Support = true
}

type metric struct {
name string
tags []*telegraf.Tag
Expand Down Expand Up @@ -269,19 +257,8 @@ func convertField(v interface{}) interface{} {
case int:
return int64(v)
case uint:
if v <= uint(MaxInt) {
return int64(v)
} else {
return int64(MaxInt)
}
return uint64(v)
case uint64:
if enableUint64Support == false {
if v <= uint64(MaxInt) {
return int64(v)
} else {
return int64(MaxInt)
}
}
return uint64(v)
case []byte:
return string(v)
Expand All @@ -292,11 +269,11 @@ func convertField(v interface{}) interface{} {
case int8:
return int64(v)
case uint32:
return int64(v)
return uint64(v)
case uint16:
return int64(v)
return uint64(v)
case uint8:
return int64(v)
return uint64(v)
case float32:
return float64(v)
default:
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ This InfluxDB output plugin writes metrics to the [InfluxDB](https://github.com/
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"

## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
```
3 changes: 2 additions & 1 deletion plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ type HTTPConfig struct {
RetentionPolicy string
Consistency string

Serializer *influx.Serializer
InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
}

type httpClient struct {
Expand Down
10 changes: 10 additions & 0 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type InfluxDB struct {
HTTPHeaders map[string]string `toml:"http_headers"`
ContentEncoding string `toml:"content_encoding"`
SkipDatabaseCreation bool `toml:"skip_database_creation"`
InfluxUintSupport bool `toml:"influx_uint_support"`

// Path to CA file
SSLCA string `toml:"ssl_ca"`
Expand Down Expand Up @@ -119,6 +120,12 @@ var sampleConfig = `
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
`

func (i *InfluxDB) Connect() error {
Expand All @@ -135,6 +142,9 @@ func (i *InfluxDB) Connect() error {
}

i.serializer = influx.NewSerializer()
if i.InfluxUintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}

for _, u := range urls {
u, err := url.Parse(u)
Expand Down
10 changes: 5 additions & 5 deletions plugins/parsers/influx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *MetricHandler) AddInt(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseIntBytes(bytes.TrimSuffix(value, []byte("i")), 10, 64)
if err != nil {
log.Errorf("E! Received unparseable int value: %q", value)
log.Errorf("E! Received unparseable int value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -58,7 +58,7 @@ func (h *MetricHandler) AddUint(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseUintBytes(bytes.TrimSuffix(value, []byte("u")), 10, 64)
if err != nil {
log.Errorf("E! Received unparseable uint value: %q", value)
log.Errorf("E! Received unparseable uint value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -68,7 +68,7 @@ func (h *MetricHandler) AddFloat(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseFloatBytes(value, 64)
if err != nil {
log.Errorf("E! Received unparseable float value: %q", value)
log.Errorf("E! Received unparseable float value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -84,7 +84,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseBoolBytes(value)
if err != nil {
log.Errorf("E! Received unparseable boolean value: %q", value)
log.Errorf("E! Received unparseable boolean value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -93,7 +93,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
func (h *MetricHandler) SetTimestamp(tm []byte) {
v, err := parseIntBytes(tm, 10, 64)
if err != nil {
log.Errorf("E! Received unparseable timestamp: %q", tm)
log.Errorf("E! Received unparseable timestamp: %q: %v", tm, err)
return
}
ns := v * int64(h.precision)
Expand Down
52 changes: 38 additions & 14 deletions plugins/parsers/influx/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ func Metric(v telegraf.Metric, err error) telegraf.Metric {
return v
}

const (
Uint64Overflow uint64 = 9223372036854775808
Uint64Max uint64 = 18446744073709551615
Uint64Test uint64 = 42
)

var DefaultTime = func() time.Time {
return time.Unix(42, 0)
}
Expand Down Expand Up @@ -263,15 +257,30 @@ var ptests = []struct {
err: nil,
},
{
name: "field uint",
input: []byte("cpu value=42u"),
name: "field int overflow dropped",
input: []byte("cpu value=9223372036854775808i"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(42, 0),
),
),
},
err: nil,
},
{
name: "field int max value",
input: []byte("cpu value=9223372036854775807i"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": Uint64Test,
"value": 9223372036854775807,
},
time.Unix(42, 0),
),
Expand All @@ -280,15 +289,15 @@ var ptests = []struct {
err: nil,
},
{
name: "field uint int overflow",
input: []byte("cpu value=9223372036854775808u"),
name: "field uint",
input: []byte("cpu value=42u"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": Uint64Overflow,
"value": uint64(42),
},
time.Unix(42, 0),
),
Expand All @@ -297,15 +306,30 @@ var ptests = []struct {
err: nil,
},
{
name: "field uint maximum",
name: "field uint overflow dropped",
input: []byte("cpu value=18446744073709551616u"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(42, 0),
),
),
},
err: nil,
},
{
name: "field uint max value",
input: []byte("cpu value=18446744073709551615u"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": Uint64Max,
"value": uint64(18446744073709551615),
},
time.Unix(42, 0),
),
Expand Down
Loading

0 comments on commit b99cd14

Please sign in to comment.