Skip to content

Commit

Permalink
Merge pull request #21 from fromYukki/memcached
Browse files Browse the repository at this point in the history
Memcached plugin
  • Loading branch information
evanphx committed Jun 23, 2015
2 parents 043b171 + b86d789 commit b8f3c68
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
134 changes: 134 additions & 0 deletions plugins/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package memcached

import (
"bufio"
"bytes"
"fmt"
"net"
"strconv"
"time"

"github.com/influxdb/telegraf/plugins"
)

// Memcached is a memcached plugin
type Memcached struct {
Servers []string
}

var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`

var defaultTimeout = 5 * time.Second

// The list of metrics tha should be calculated
var sendAsIs = []string{
"get_hits",
"get_misses",
"evictions",
"limit_maxbytes",
"bytes",
}

// SampleConfig returns sample configuration message
func (m *Memcached) SampleConfig() string {
return sampleConfig
}

// Description returns description of Memcached plugin
func (m *Memcached) Description() string {
return "Read metrics from one or many memcached servers"
}

// Gather reads stats from all configured servers accumulates stats
func (m *Memcached) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
return m.gatherServer(":11211", acc)
}

for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, acc); err != nil {
return err
}
}

return nil
}

func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error {
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":11211"
}

// Connect
conn, err := net.DialTimeout("tcp", address, defaultTimeout)
if err != nil {
return err
}
defer conn.Close()

// Extend connection
conn.SetDeadline(time.Now().Add(defaultTimeout))

// Read and write buffer
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

// Send command
if _, err = fmt.Fprint(rw, "stats\r\n"); err != nil {
return err
}
if err = rw.Flush(); err != nil {
return err
}

// Read response
values := make(map[string]string)

for {
// Read line
line, _, errRead := rw.Reader.ReadLine()
if errRead != nil {
return errRead
}
// Done
if bytes.Equal(line, []byte("END")) {
break
}
// Read values
var name, value string
n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value)
if errScan != nil || n != 2 {
return fmt.Errorf("unexpected line in stats response: %q", line)
}

// Save values
values[name] = value
}

//
tags := map[string]string{"server": address}

// Process values
for _, key := range sendAsIs {
if value, ok := values[key]; ok {
// Mostly it is the number
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
acc.Add(key, value, tags)
} else {
acc.Add(key, iValue, tags)
}
}
}
return nil
}

func init() {
plugins.Add("memcached", func() plugins.Plugin {
return &Memcached{}
})
}
26 changes: 26 additions & 0 deletions plugins/memcached/memcached_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package memcached

import (
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestMemcachedGeneratesMetrics(t *testing.T) {
m := &Memcached{
Servers: []string{"localhost"},
}

var acc testutil.Accumulator

err := m.Gather(&acc)
require.NoError(t, err)

intMetrics := []string{"get_hits", "get_misses", "evictions", "limit_maxbytes", "bytes"}

for _, metric := range intMetrics {
assert.True(t, acc.HasIntValue(metric), metric)
}
}

0 comments on commit b8f3c68

Please sign in to comment.