From 36976a55475eed96e35460badb301ce4c2d47944 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Wed, 2 Mar 2016 15:31:46 +0000
Subject: [PATCH] Adding a TCP input listener

closes #481
---
 CHANGELOG.md                                  |   2 +
 plugins/inputs/all/all.go                     |   1 +
 plugins/inputs/tcp_listener/README.md         |  30 ++
 plugins/inputs/tcp_listener/tcp_listener.go   | 264 ++++++++++++++++++
 .../inputs/tcp_listener/tcp_listener_test.go  | 259 +++++++++++++++++
 5 files changed, 556 insertions(+)
 create mode 100644 plugins/inputs/tcp_listener/README.md
 create mode 100644 plugins/inputs/tcp_listener/tcp_listener.go
 create mode 100644 plugins/inputs/tcp_listener/tcp_listener_test.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 73a9427300b53..4e7ce13469956 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,8 @@
 - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert!
 - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
 - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
+- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.
+
 
 ### Bugfixes
 - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index 262de37acba6c..2808ce2b54f20 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -47,6 +47,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
 	_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
 	_ "github.com/influxdata/telegraf/plugins/inputs/system"
+	_ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener"
 	_ "github.com/influxdata/telegraf/plugins/inputs/trig"
 	_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
 	_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"
diff --git a/plugins/inputs/tcp_listener/README.md b/plugins/inputs/tcp_listener/README.md
new file mode 100644
index 0000000000000..63a7dea3c0089
--- /dev/null
+++ b/plugins/inputs/tcp_listener/README.md
@@ -0,0 +1,30 @@
+# TCP listener service input plugin
+
+The TCP listener is a service input plugin that listens for messages on a TCP
+socket and adds those messages to InfluxDB.
+The plugin expects messages in the
+[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
+
+### Configuration:
+
+This is a sample configuration for the plugin.
+
+```toml
+# Generic TCP listener
+[[inputs.tcp_listener]]
+  ## Address and port to host TCP listener on
+  service_address = ":8094"
+
+  ## Number of TCP messages allowed to queue up. Once filled, the
+  ## TCP listener will start dropping packets.
+  allowed_pending_messages = 10000
+
+  ## Maximum number of concurrent TCP connections to allow
+  max_tcp_connections = 250
+
+  ## Data format to consume. This can be "json", "influx" or "graphite"
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+```
diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go
new file mode 100644
index 0000000000000..dd239fedf4625
--- /dev/null
+++ b/plugins/inputs/tcp_listener/tcp_listener.go
@@ -0,0 +1,264 @@
+package tcp_listener
+
+import (
+	"bufio"
+	"fmt"
+	"log"
+	"net"
+	"sync"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/plugins/inputs"
+	"github.com/influxdata/telegraf/plugins/parsers"
+)
+
+type TcpListener struct {
+	ServiceAddress         string
+	AllowedPendingMessages int
+	MaxTCPConnections      int `toml:"max_tcp_connections"`
+
+	sync.Mutex
+	// Lock for preventing a data race during resource cleanup
+	cleanup sync.Mutex
+	wg      sync.WaitGroup
+
+	in   chan []byte
+	done chan struct{}
+	// accept channel tracks how many active connections there are, if there
+	// is an available bool in accept, then we are below the maximum and can
+	// accept the connection
+	accept chan bool
+
+	// track the listener here so we can close it in Stop()
+	listener *net.TCPListener
+	// track current connections so we can close them in Stop()
+	conns map[string]*net.TCPConn
+
+	parser parsers.Parser
+	acc    telegraf.Accumulator
+}
+
+var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
+	"You may want to increase allowed_pending_messages in the config\n"
+
+const sampleConfig = `
+  ## Address and port to host TCP listener on
+  service_address = ":8094"
+
+  ## Number of TCP messages allowed to queue up. Once filled, the
+  ## TCP listener will start dropping packets.
+  allowed_pending_messages = 10000
+
+  ## Maximum number of concurrent TCP connections to allow
+  max_tcp_connections = 250
+
+  ## Data format to consume. This can be "json", "influx" or "graphite"
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+`
+
+func (t *TcpListener) SampleConfig() string {
+	return sampleConfig
+}
+
+func (t *TcpListener) Description() string {
+	return "Generic TCP listener"
+}
+
+// All the work is done in the Start() function, so this is just a dummy
+// function.
+func (t *TcpListener) Gather(_ telegraf.Accumulator) error {
+	return nil
+}
+
+func (t *TcpListener) SetParser(parser parsers.Parser) {
+	t.parser = parser
+}
+
+// Start starts the tcp listener service.
+func (t *TcpListener) Start(acc telegraf.Accumulator) error {
+	t.Lock()
+	defer t.Unlock()
+
+	t.acc = acc
+	t.in = make(chan []byte, t.AllowedPendingMessages)
+	t.done = make(chan struct{})
+	t.accept = make(chan bool, t.MaxTCPConnections)
+	t.conns = make(map[string]*net.TCPConn)
+	for i := 0; i < t.MaxTCPConnections; i++ {
+		t.accept <- true
+	}
+
+	// Start listener
+	var err error
+	address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress)
+	t.listener, err = net.ListenTCP("tcp", address)
+	if err != nil {
+		log.Fatalf("ERROR: ListenUDP - %s", err)
+		return err
+	}
+	log.Println("TCP server listening on: ", t.listener.Addr().String())
+
+	t.wg.Add(2)
+	go t.tcpListen()
+	go t.tcpParser()
+
+	log.Printf("Started TCP listener service on %s\n", t.ServiceAddress)
+	return nil
+}
+
+// Stop cleans up all resources
+func (t *TcpListener) Stop() {
+	t.Lock()
+	defer t.Unlock()
+	close(t.done)
+	t.listener.Close()
+
+	// Close all open TCP connections
+	//  - get all conns from the t.conns map and put into slice
+	//  - this is so the forget() function doesnt conflict with looping
+	//    over the t.conns map
+	var conns []*net.TCPConn
+	t.cleanup.Lock()
+	for _, conn := range t.conns {
+		conns = append(conns, conn)
+	}
+	t.cleanup.Unlock()
+	for _, conn := range conns {
+		conn.Close()
+	}
+
+	t.wg.Wait()
+	close(t.in)
+	log.Println("Stopped TCP listener service on ", t.ServiceAddress)
+}
+
+// tcpListen listens for incoming TCP connections.
+func (t *TcpListener) tcpListen() error {
+	defer t.wg.Done()
+
+	for {
+		select {
+		case <-t.done:
+			return nil
+		default:
+			// Accept connection:
+			conn, err := t.listener.AcceptTCP()
+			if err != nil {
+				return err
+			}
+
+			log.Printf("Received TCP Connection from %s", conn.RemoteAddr())
+
+			select {
+			case <-t.accept:
+				// not over connection limit, handle the connection properly.
+				t.wg.Add(1)
+				// generate a random id for this TCPConn
+				id := internal.RandomString(6)
+				t.remember(id, conn)
+				go t.handler(conn, id)
+			default:
+				// We are over the connection limit, refuse & close.
+				t.refuser(conn)
+			}
+		}
+	}
+}
+
+// refuser refuses a TCP connection
+func (t *TcpListener) refuser(conn *net.TCPConn) {
+	// Tell the connection why we are closing.
+	fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+
+		" reached, closing.\nYou may want to increase max_tcp_connections in"+
+		" the Telegraf tcp listener configuration.\n", t.MaxTCPConnections)
+	conn.Close()
+	log.Printf("Refused TCP Connection from %s", conn.RemoteAddr())
+	log.Printf("WARNING: Maximum TCP Connections reached, you may want to" +
+		" adjust max_tcp_connections")
+}
+
+// handler handles a single TCP Connection
+func (t *TcpListener) handler(conn *net.TCPConn, id string) {
+	// connection cleanup function
+	defer func() {
+		t.wg.Done()
+		conn.Close()
+		log.Printf("Closed TCP Connection from %s", conn.RemoteAddr())
+		// Add one connection potential back to channel when this one closes
+		t.accept <- true
+		t.forget(id)
+	}()
+
+	scanner := bufio.NewScanner(conn)
+	for {
+		select {
+		case <-t.done:
+			return
+		default:
+			if !scanner.Scan() {
+				return
+			}
+			buf := scanner.Bytes()
+			select {
+			case t.in <- buf:
+			default:
+				log.Printf(dropwarn, string(buf))
+			}
+		}
+	}
+}
+
+// tcpParser parses the incoming tcp byte packets
+func (t *TcpListener) tcpParser() error {
+	defer t.wg.Done()
+	for {
+		select {
+		case <-t.done:
+			return nil
+		case packet := <-t.in:
+			if len(packet) == 0 {
+				continue
+			}
+			metrics, err := t.parser.Parse(packet)
+			if err == nil {
+				t.storeMetrics(metrics)
+			} else {
+				log.Printf("Malformed packet: [%s], Error: %s\n",
+					string(packet), err)
+			}
+		}
+	}
+}
+
+func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error {
+	t.Lock()
+	defer t.Unlock()
+	for _, m := range metrics {
+		t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
+	}
+	return nil
+}
+
+// forget a TCP connection
+func (t *TcpListener) forget(id string) {
+	t.cleanup.Lock()
+	defer t.cleanup.Unlock()
+	delete(t.conns, id)
+}
+
+// remember a TCP connection
+func (t *TcpListener) remember(id string, conn *net.TCPConn) {
+	t.cleanup.Lock()
+	defer t.cleanup.Unlock()
+	t.conns[id] = conn
+}
+
+func init() {
+	inputs.Add("tcp_listener", func() telegraf.Input {
+		return &TcpListener{}
+	})
+}
diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go
new file mode 100644
index 0000000000000..b4aec9dd20a25
--- /dev/null
+++ b/plugins/inputs/tcp_listener/tcp_listener_test.go
@@ -0,0 +1,259 @@
+package tcp_listener
+
+import (
+	"fmt"
+	"net"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/influxdata/telegraf/testutil"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
+
+	testMsgs = `
+cpu_load_short,host=server02 value=12.0 1422568543702900257
+cpu_load_short,host=server03 value=12.0 1422568543702900257
+cpu_load_short,host=server04 value=12.0 1422568543702900257
+cpu_load_short,host=server05 value=12.0 1422568543702900257
+cpu_load_short,host=server06 value=12.0 1422568543702900257
+`
+)
+
+func newTestTcpListener() (*TcpListener, chan []byte) {
+	in := make(chan []byte, 1500)
+	listener := &TcpListener{
+		ServiceAddress:         ":8194",
+		AllowedPendingMessages: 10000,
+		MaxTCPConnections:      250,
+		in:                     in,
+		done:                   make(chan struct{}),
+	}
+	return listener, in
+}
+
+func TestConnectTCP(t *testing.T) {
+	listener := TcpListener{
+		ServiceAddress:         ":8194",
+		AllowedPendingMessages: 10000,
+		MaxTCPConnections:      250,
+	}
+	listener.parser, _ = parsers.NewInfluxParser()
+
+	acc := &testutil.Accumulator{}
+	require.NoError(t, listener.Start(acc))
+	defer listener.Stop()
+
+	time.Sleep(time.Millisecond * 25)
+	conn, err := net.Dial("tcp", "127.0.0.1:8194")
+	require.NoError(t, err)
+
+	// send single message to socket
+	fmt.Fprintf(conn, testMsg)
+	time.Sleep(time.Millisecond * 15)
+	acc.AssertContainsTaggedFields(t, "cpu_load_short",
+		map[string]interface{}{"value": float64(12)},
+		map[string]string{"host": "server01"},
+	)
+
+	// send multiple messages to socket
+	fmt.Fprintf(conn, testMsgs)
+	time.Sleep(time.Millisecond * 15)
+	hostTags := []string{"server02", "server03",
+		"server04", "server05", "server06"}
+	for _, hostTag := range hostTags {
+		acc.AssertContainsTaggedFields(t, "cpu_load_short",
+			map[string]interface{}{"value": float64(12)},
+			map[string]string{"host": hostTag},
+		)
+	}
+}
+
+// Test that MaxTCPConections is respected
+func TestConcurrentConns(t *testing.T) {
+	listener := TcpListener{
+		ServiceAddress:         ":8195",
+		AllowedPendingMessages: 10000,
+		MaxTCPConnections:      2,
+	}
+	listener.parser, _ = parsers.NewInfluxParser()
+
+	acc := &testutil.Accumulator{}
+	require.NoError(t, listener.Start(acc))
+	defer listener.Stop()
+
+	time.Sleep(time.Millisecond * 25)
+	_, err := net.Dial("tcp", "127.0.0.1:8195")
+	assert.NoError(t, err)
+	_, err = net.Dial("tcp", "127.0.0.1:8195")
+	assert.NoError(t, err)
+
+	// Connection over the limit:
+	conn, err := net.Dial("tcp", "127.0.0.1:8195")
+	assert.NoError(t, err)
+	net.Dial("tcp", "127.0.0.1:8195")
+	buf := make([]byte, 1500)
+	n, err := conn.Read(buf)
+	assert.NoError(t, err)
+	assert.Equal(t,
+		"Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+
+			"You may want to increase max_tcp_connections in"+
+			" the Telegraf tcp listener configuration.\n",
+		string(buf[:n]))
+
+	_, err = conn.Write([]byte(testMsg))
+	assert.NoError(t, err)
+	time.Sleep(time.Millisecond * 10)
+	assert.Zero(t, acc.NFields())
+}
+
+// Test that MaxTCPConections is respected when max==1
+func TestConcurrentConns1(t *testing.T) {
+	listener := TcpListener{
+		ServiceAddress:         ":8196",
+		AllowedPendingMessages: 10000,
+		MaxTCPConnections:      1,
+	}
+	listener.parser, _ = parsers.NewInfluxParser()
+
+	acc := &testutil.Accumulator{}
+	require.NoError(t, listener.Start(acc))
+	defer listener.Stop()
+
+	time.Sleep(time.Millisecond * 25)
+	_, err := net.Dial("tcp", "127.0.0.1:8196")
+	assert.NoError(t, err)
+
+	// Connection over the limit:
+	conn, err := net.Dial("tcp", "127.0.0.1:8196")
+	assert.NoError(t, err)
+	net.Dial("tcp", "127.0.0.1:8196")
+	buf := make([]byte, 1500)
+	n, err := conn.Read(buf)
+	assert.NoError(t, err)
+	assert.Equal(t,
+		"Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+
+			"You may want to increase max_tcp_connections in"+
+			" the Telegraf tcp listener configuration.\n",
+		string(buf[:n]))
+
+	_, err = conn.Write([]byte(testMsg))
+	assert.NoError(t, err)
+	time.Sleep(time.Millisecond * 10)
+	assert.Zero(t, acc.NFields())
+}
+
+// Test that MaxTCPConections is respected
+func TestCloseConcurrentConns(t *testing.T) {
+	listener := TcpListener{
+		ServiceAddress:         ":8195",
+		AllowedPendingMessages: 10000,
+		MaxTCPConnections:      2,
+	}
+	listener.parser, _ = parsers.NewInfluxParser()
+
+	acc := &testutil.Accumulator{}
+	require.NoError(t, listener.Start(acc))
+
+	time.Sleep(time.Millisecond * 25)
+	_, err := net.Dial("tcp", "127.0.0.1:8195")
+	assert.NoError(t, err)
+	_, err = net.Dial("tcp", "127.0.0.1:8195")
+	assert.NoError(t, err)
+
+	listener.Stop()
+}
+
+func TestRunParser(t *testing.T) {
+	var testmsg = []byte(testMsg)
+
+	listener, in := newTestTcpListener()
+	acc := testutil.Accumulator{}
+	listener.acc = &acc
+	defer close(listener.done)
+
+	listener.parser, _ = parsers.NewInfluxParser()
+	listener.wg.Add(1)
+	go listener.tcpParser()
+
+	in <- testmsg
+	time.Sleep(time.Millisecond * 25)
+	listener.Gather(&acc)
+
+	if a := acc.NFields(); a != 1 {
+		t.Errorf("got %v, expected %v", a, 1)
+	}
+
+	acc.AssertContainsTaggedFields(t, "cpu_load_short",
+		map[string]interface{}{"value": float64(12)},
+		map[string]string{"host": "server01"},
+	)
+}
+
+func TestRunParserInvalidMsg(t *testing.T) {
+	var testmsg = []byte("cpu_load_short")
+
+	listener, in := newTestTcpListener()
+	acc := testutil.Accumulator{}
+	listener.acc = &acc
+	defer close(listener.done)
+
+	listener.parser, _ = parsers.NewInfluxParser()
+	listener.wg.Add(1)
+	go listener.tcpParser()
+
+	in <- testmsg
+	time.Sleep(time.Millisecond * 25)
+
+	if a := acc.NFields(); a != 0 {
+		t.Errorf("got %v, expected %v", a, 0)
+	}
+}
+
+func TestRunParserGraphiteMsg(t *testing.T) {
+	var testmsg = []byte("cpu.load.graphite 12 1454780029")
+
+	listener, in := newTestTcpListener()
+	acc := testutil.Accumulator{}
+	listener.acc = &acc
+	defer close(listener.done)
+
+	listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
+	listener.wg.Add(1)
+	go listener.tcpParser()
+
+	in <- testmsg
+	time.Sleep(time.Millisecond * 25)
+	listener.Gather(&acc)
+
+	acc.AssertContainsFields(t, "cpu_load_graphite",
+		map[string]interface{}{"value": float64(12)})
+}
+
+func TestRunParserJSONMsg(t *testing.T) {
+	var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n")
+
+	listener, in := newTestTcpListener()
+	acc := testutil.Accumulator{}
+	listener.acc = &acc
+	defer close(listener.done)
+
+	listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
+	listener.wg.Add(1)
+	go listener.tcpParser()
+
+	in <- testmsg
+	time.Sleep(time.Millisecond * 25)
+	listener.Gather(&acc)
+
+	acc.AssertContainsFields(t, "udp_json_test",
+		map[string]interface{}{
+			"a":   float64(5),
+			"b_c": float64(6),
+		})
+}