Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve InfluxDB through-put performance #2251

Merged
merged 2 commits into from
Jan 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin.
- [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin.
- [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations.

### Bugfixes

Expand Down
9 changes: 8 additions & 1 deletion metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ const (
)

type Metric interface {
// Serialize serializes the metric into a line-protocol byte buffer,
// including a newline at the end.
Serialize() []byte
String() string // convenience function for string(Serialize())
// same as Serialize, but avoids an allocation.
// returns number of bytes copied into dst.
SerializeTo(dst []byte) int
// String is the same as Serialize, but returns a string.
String() string
// Copy deep-copies the metric.
Copy() Metric
// Split will attempt to return multiple metrics with the same timestamp
// whose string representations are no longer than maxSize.
Expand Down
42 changes: 42 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,48 @@ func (m *metric) Serialize() []byte {
return tmp
}

func (m *metric) SerializeTo(dst []byte) int {
i := 0
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.name)
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.tags)
if i >= len(dst) {
return i
}

dst[i] = ' '
i++
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.fields)
if i >= len(dst) {
return i
}

dst[i] = ' '
i++
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.t)
if i >= len(dst) {
return i
}
dst[i] = '\n'

return i + 1
}

func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize {
return []telegraf.Metric{m}
Expand Down
155 changes: 155 additions & 0 deletions metric/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package metric

import (
"io"

"github.com/influxdata/telegraf"
)

type state int

const (
_ state = iota
// normal state copies whole metrics into the given buffer until we can't
// fit the next metric.
normal
// split state means that we have a metric that we were able to split, so
// that we can fit it into multiple metrics (and calls to Read)
split
// overflow state means that we have a metric that didn't fit into a single
// buffer, and needs to be split across multiple calls to Read.
overflow
// splitOverflow state means that a split metric didn't fit into a single
// buffer, and needs to be split across multiple calls to Read.
splitOverflow
// done means we're done reading metrics, and now always return (0, io.EOF)
done
)

type reader struct {
metrics []telegraf.Metric
splitMetrics []telegraf.Metric
buf []byte
state state

// metric index
iM int
// split metric index
iSM int
// buffer index
iB int
}

func NewReader(metrics []telegraf.Metric) io.Reader {
return &reader{
metrics: metrics,
state: normal,
}
}

func (r *reader) Read(p []byte) (n int, err error) {
var i int
switch r.state {
case done:
return 0, io.EOF
case normal:
for {
// this for-loop is the sunny-day scenario, where we are given a
// buffer that is large enough to hold at least a single metric.
// all of the cases below it are edge-cases.
if r.metrics[r.iM].Len() < len(p[i:]) {
i += r.metrics[r.iM].SerializeTo(p[i:])
} else {
break
}
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
}

// if we haven't written any bytes, check if we can split the current
// metric into multiple full metrics at a smaller size.
if i == 0 {
tmp := r.metrics[r.iM].Split(len(p))
if len(tmp) > 1 {
r.splitMetrics = tmp
r.state = split
if r.splitMetrics[0].Len() < len(p) {
i += r.splitMetrics[0].SerializeTo(p)
r.iSM = 1
} else {
// splitting didn't quite work, so we'll drop down and
// overflow the metric.
r.state = normal
r.iSM = 0
}
}
}

// if we haven't written any bytes and we're not at the end of the metrics
// slice, then it means we have a single metric that is larger than the
// provided buffer.
if i == 0 {
r.buf = r.metrics[r.iM].Serialize()
i += copy(p, r.buf[r.iB:])
r.iB += i
r.state = overflow
}

case split:
if r.splitMetrics[r.iSM].Len() < len(p) {
// write the current split metric
i += r.splitMetrics[r.iSM].SerializeTo(p)
r.iSM++
if r.iSM >= len(r.splitMetrics) {
// done writing the current split metrics
r.iSM = 0
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal
}
} else {
// This would only happen if we split the metric, and then a
// subsequent buffer was smaller than the initial one given,
// so that our split metric no longer fits.
r.buf = r.splitMetrics[r.iSM].Serialize()
i += copy(p, r.buf[r.iB:])
r.iB += i
r.state = splitOverflow
}

case splitOverflow:
i = copy(p, r.buf[r.iB:])
r.iB += i
if r.iB >= len(r.buf) {
r.iB = 0
r.iSM++
if r.iSM == len(r.splitMetrics) {
r.iM++
r.state = normal
} else {
r.state = split
}
}

case overflow:
i = copy(p, r.buf[r.iB:])
r.iB += i
if r.iB >= len(r.buf) {
r.iB = 0
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal
}
}

return i, nil
}
Loading