Skip to content

Commit

Permalink
Improve the InfluxDB through-put performance
Browse files Browse the repository at this point in the history
This changes the current use of the InfluxDB client to instead use a
baked-in client that uses the fasthttp library.

This allows for significantly smaller allocations, the re-use of http
body buffers, and the re-use of the actual bytes of the line-protocol
metric representations.
  • Loading branch information
sparrc committed Jan 11, 2017
1 parent 2aa2c79 commit ff1153b
Show file tree
Hide file tree
Showing 14 changed files with 1,722 additions and 82 deletions.
2 changes: 2 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8
github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7
github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
Expand Down
9 changes: 8 additions & 1 deletion metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ const (
)

type Metric interface {
// Serialize serializes the metric into a line-protocol byte buffer,
// including a newline at the end.
Serialize() []byte
String() string // convenience function for string(Serialize())
// same as Serialize, but avoids an allocation.
// returns number of bytes copied into dst.
SerializeTo(dst []byte) int
// String is the same as Serialize, but returns a string.
String() string
// Copy deep-copies the metric.
Copy() Metric
// Split will attempt to return multiple metrics with the same timestamp
// whose string representations are no longer than maxSize.
Expand Down
42 changes: 42 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,48 @@ func (m *metric) Serialize() []byte {
return tmp
}

func (m *metric) SerializeTo(dst []byte) int {
i := 0
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.name)
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.tags)
if i >= len(dst) {
return i
}

dst[i] = ' '
i++
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.fields)
if i >= len(dst) {
return i
}

dst[i] = ' '
i++
if i >= len(dst) {
return i
}

i += copy(dst[i:], m.t)
if i >= len(dst) {
return i
}
dst[i] = '\n'

return i + 1
}

func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize {
return []telegraf.Metric{m}
Expand Down
155 changes: 155 additions & 0 deletions metric/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package metric

import (
"io"

"github.com/influxdata/telegraf"
)

type state int

const (
_ state = iota
// normal state copies whole metrics into the given buffer until we can't
// fit the next metric.
normal
// split state means that we have a metric that we were able to split, so
// that we can fit it into multiple metrics (and calls to Read)
split
// overflow state means that we have a metric that didn't fit into a single
// buffer, and needs to be split across multiple calls to Read.
overflow
// splitOverflow state means that a split metric didn't fit into a single
// buffer, and needs to be split across multiple calls to Read.
splitOverflow
// done means we're done reading metrics, and now always return (0, io.EOF)
done
)

type reader struct {
metrics []telegraf.Metric
splitMetrics []telegraf.Metric
buf []byte
state state

// metric index
iM int
// split metric index
iSM int
// buffer index
iB int
}

func NewReader(metrics []telegraf.Metric) io.Reader {
return &reader{
metrics: metrics,
state: normal,
}
}

func (r *reader) Read(p []byte) (n int, err error) {
var i int
switch r.state {
case done:
return 0, io.EOF
case normal:
for {
// this for-loop is the sunny-day scenario, where we are given a
// buffer that is large enough to hold at least a single metric.
// all of the cases below it are edge-cases.
if r.metrics[r.iM].Len() < len(p[i:]) {
i += r.metrics[r.iM].SerializeTo(p[i:])
} else {
break
}
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
}

// if we haven't written any bytes, check if we can split the current
// metric into multiple full metrics at a smaller size.
if i == 0 {
tmp := r.metrics[r.iM].Split(len(p))
if len(tmp) > 1 {
r.splitMetrics = tmp
r.state = split
if r.splitMetrics[0].Len() < len(p) {
i += r.splitMetrics[0].SerializeTo(p)
r.iSM = 1
} else {
// splitting didn't quite work, so we'll drop down and
// overflow the metric.
r.state = normal
r.iSM = 0
}
}
}

// if we haven't written any bytes and we're not at the end of the metrics
// slice, then it means we have a single metric that is larger than the
// provided buffer.
if i == 0 {
r.buf = r.metrics[r.iM].Serialize()
i += copy(p, r.buf[r.iB:])
r.iB += i
r.state = overflow
}

case split:
if r.splitMetrics[r.iSM].Len() < len(p) {
// write the current split metric
i += r.splitMetrics[r.iSM].SerializeTo(p)
r.iSM++
if r.iSM >= len(r.splitMetrics) {
// done writing the current split metrics
r.iSM = 0
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal
}
} else {
// This would only happen if we split the metric, and then a
// subsequent buffer was smaller than the initial one given,
// so that our split metric no longer fits.
r.buf = r.splitMetrics[r.iSM].Serialize()
i += copy(p, r.buf[r.iB:])
r.iB += i
r.state = splitOverflow
}

case splitOverflow:
i = copy(p, r.buf[r.iB:])
r.iB += i
if r.iB >= len(r.buf) {
r.iB = 0
r.iSM++
if r.iSM == len(r.splitMetrics) {
r.iM++
r.state = normal
} else {
r.state = split
}
}

case overflow:
i = copy(p, r.buf[r.iB:])
r.iB += i
if r.iB >= len(r.buf) {
r.iB = 0
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal
}
}

return i, nil
}
Loading

0 comments on commit ff1153b

Please sign in to comment.