Skip to content

Commit

Permalink
Fix udp metric splitting (#2880)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Jun 5, 2017
1 parent 37e0180 commit 5bab461
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 113 deletions.
10 changes: 7 additions & 3 deletions metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
// this for-loop is the sunny-day scenario, where we are given a
// buffer that is large enough to hold at least a single metric.
// all of the cases below it are edge-cases.
if r.metrics[r.iM].Len() < len(p[i:]) {
if r.metrics[r.iM].Len() <= len(p[i:]) {
i += r.metrics[r.iM].SerializeTo(p[i:])
} else {
break
Expand All @@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
if len(tmp) > 1 {
r.splitMetrics = tmp
r.state = split
if r.splitMetrics[0].Len() < len(p) {
if r.splitMetrics[0].Len() <= len(p) {
i += r.splitMetrics[0].SerializeTo(p)
r.iSM = 1
} else {
Expand All @@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
}

case split:
if r.splitMetrics[r.iSM].Len() < len(p) {
if r.splitMetrics[r.iSM].Len() <= len(p) {
// write the current split metric
i += r.splitMetrics[r.iSM].SerializeTo(p)
r.iSM++
Expand Down Expand Up @@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
r.iSM++
if r.iSM == len(r.splitMetrics) {
r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal
} else {
r.state = split
Expand Down
150 changes: 149 additions & 1 deletion metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/influxdata/telegraf"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func BenchmarkMetricReader(b *testing.B) {
Expand Down Expand Up @@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) {
}
}

// Regression test for when a metric is the same size as the buffer.
//
// Previously EOF would not be set until the next call to Read.
func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1)}, ts)
metrics := []telegraf.Metric{m1}

r := NewReader(metrics)
buf := make([]byte, m1.Len())

for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}

// Regression test for when a metric requires to be split and one of the
// split metrics is exactly the size of the buffer.
//
// Previously an empty string would be returned on the next Read without error,
// and then next Read call would panic.
func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts)
metrics := []telegraf.Metric{m1}

r := NewReader(metrics)
buf := make([]byte, 30)

// foo a=1i,bb=2i 1481032190000000000\n // len 35
//
// Requires this specific split order:
// foo a=1i 1481032190000000000\n // len 29
// foo bb=2i 1481032190000000000\n // len 30

for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}

// Regresssion test for when a metric requires to be split and one of the
// split metrics is larger than the buffer.
//
// Previously the metric index would be set incorrectly causing a panic.
func TestMetricReader_SplitOverflowOversized(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{
"a": int64(1),
"bbb": int64(2),
}, ts)
metrics := []telegraf.Metric{m1}

r := NewReader(metrics)
buf := make([]byte, 30)

// foo a=1i,bbb=2i 1481032190000000000\n // len 36
//
// foo a=1i 1481032190000000000\n // len 29
// foo bbb=2i 1481032190000000000\n // len 31

for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}

// Regresssion test for when a split metric exactly fits in the buffer.
//
// Previously the metric would be overflow split when not required.
func TestMetricReader_SplitOverflowUneeded(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "b": int64(2)}, ts)
metrics := []telegraf.Metric{m1}

r := NewReader(metrics)
buf := make([]byte, 29)

// foo a=1i,b=2i 1481032190000000000\n // len 34
//
// foo a=1i 1481032190000000000\n // len 29
// foo b=2i 1481032190000000000\n // len 29

for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}

func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
ts := time.Unix(1481032190, 0)
m, _ := New("foo", map[string]string{},
Expand Down Expand Up @@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
assert.Equal(t, test.err, err, test.expRegex)
}
}

func TestMetricRoundtrip(t *testing.T) {
const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000
nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000
`
metrics, err := Parse([]byte(lp))
require.NoError(t, err)
r := NewReader(metrics)
buf := make([]byte, 128)
_, err = r.Read(buf)
require.NoError(t, err)
metrics, err = Parse(buf)
require.NoError(t, err)
}
27 changes: 23 additions & 4 deletions plugins/outputs/influxdb/client/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"log"
"net"
"net/url"
)
Expand Down Expand Up @@ -82,10 +83,28 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
if err != io.EOF && err != nil {
return totaln, err
}
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err

if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err
}
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
// Scan forward until next line break to realign.
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return totaln, err
}
if c.buffer[nR-1] == uint8('\n') {
break
}
}
}
}
return totaln, nil
Expand Down
81 changes: 24 additions & 57 deletions plugins/outputs/influxdb/client/udp_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"bytes"
"net"
"testing"
"time"
Expand All @@ -10,6 +9,7 @@ import (
"github.com/influxdata/telegraf/metric"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestUDPClient(t *testing.T) {
Expand Down Expand Up @@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) {
pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt)

metrics := `cpu value=99
cpu value=55
cpu value=44
cpu value=101
cpu value=91
cpu value=92
`
// test sending packet with 6 metrics in a stream.
reader := bytes.NewReader([]byte(metrics))
// contentLength is ignored:
n, err = client.WriteStream(reader, 10)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)

//
// Test that UDP packets get broken up properly:
config2 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 25,
}
client2, err := NewUDP(config2)
assert.NoError(t, err)

wp := WriteParams{}

//
// Using Write():
buf := []byte(metrics)
n, err = client2.WriteWithParams(buf, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)

//
// Using WriteStream():
reader = bytes.NewReader([]byte(metrics))
n, err = client2.WriteStreamWithParams(reader, 10, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)

//
// Using WriteStream() & a metric.Reader:
config3 := UDPConfig{
Expand Down Expand Up @@ -159,4 +103,27 @@ cpu value=92
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)

assert.NoError(t, client.Close())

config = UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client4, err := NewUDP(config)
assert.NoError(t, err)

ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0)
assert.NoError(t, err)
require.Equal(t, 35, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)

assert.NoError(t, client4.Close())
}
17 changes: 1 addition & 16 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored.
Precision string

clients []client.Client
splitPayload bool
clients []client.Client
}

var sampleConfig = `
Expand Down Expand Up @@ -115,7 +114,6 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
}
i.clients = append(i.clients, c)
i.splitPayload = true
default:
// If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{
Expand Down Expand Up @@ -166,22 +164,9 @@ func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}

func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
if !i.splitPayload {
return metrics
}

split := make([]telegraf.Metric, 0)
for _, m := range metrics {
split = append(split, m.Split(i.UDPPayload)...)
}
return split
}

// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
metrics = i.split(metrics)

bufsize := 0
for _, m := range metrics {
Expand Down
Loading

0 comments on commit 5bab461

Please sign in to comment.