Skip to content

Commit

Permalink
Merge pull request #4049 from influxdb/udp_stats
Browse files Browse the repository at this point in the history
Add stats to the UDP input
  • Loading branch information
otoolep committed Sep 9, 2015
2 parents dbcf56a + 2793240 commit 1ce5187
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki
- [#4048](https://github.com/influxdb/influxdb/pull/4048): Add statistics to Continuous Query service
- [#4049](https://github.com/influxdb/influxdb/pull/4049): Add stats to the UDP input
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc
Expand Down
38 changes: 33 additions & 5 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package udp

import (
"errors"
"expvar"
"log"
"net"
"os"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)
Expand All @@ -16,6 +19,17 @@ const (
UDPBufferSize = 65536
)

// statistics gathered by the UDP package.
const (
statPointsReceived = "points_rx"
statBytesReceived = "bytes_rx"
statPointsParseFail = "points_parse_fail"
statReadFail = "read_fail"
statBatchesTrasmitted = "batches_tx"
statPointsTransmitted = "points_tx"
statBatchesTransmitFail = "batches_tx_fail"
)

//
// Service represents here an UDP service
// that will listen for incoming packets
Expand All @@ -34,7 +48,8 @@ type Service struct {
WritePoints(p *cluster.WritePointsRequest) error
}

Logger *log.Logger
Logger *log.Logger
statMap *expvar.Map
}

func NewService(c Config) *Service {
Expand All @@ -47,6 +62,12 @@ func NewService(c Config) *Service {
}

func (s *Service) Open() (err error) {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"udp", s.config.BindAddress}, ":")
tags := map[string]string{"bind": s.config.BindAddress}
s.statMap = influxdb.NewStatistics(key, "udp", tags)

if s.config.BindAddress == "" {
return errors.New("bind address has to be specified in config")
}
Expand Down Expand Up @@ -81,14 +102,17 @@ func (s *Service) writePoints() {
for {
select {
case batch := <-s.batcher.Out():
err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.config.Database,
RetentionPolicy: "",
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: batch,
})
if err != nil {
s.Logger.Printf("Failed to write points batch to database %s: %s", s.config.Database, err)
}); err == nil {
s.statMap.Add(statBatchesTrasmitted, 1)
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
} else {
s.Logger.Printf("failed to write point batch to database %q: %s", s.config.Database, err)
s.statMap.Add(statBatchesTransmitFail, 1)
}

case <-s.done:
Expand All @@ -114,19 +138,23 @@ func (s *Service) serve() {

n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("Failed to read UDP message: %s", err)
continue
}
s.statMap.Add(statBytesReceived, int64(n))

points, err := tsdb.ParsePoints(buf[:n])
if err != nil {
s.statMap.Add(statPointsParseFail, 1)
s.Logger.Printf("Failed to parse points: %s", err)
continue
}

for _, point := range points {
s.batcher.In() <- point
}
s.statMap.Add(statPointsReceived, int64(len(points)))
}
}

Expand Down

0 comments on commit 1ce5187

Please sign in to comment.