From 32d6433c04bd2b664468e3e469989b8538f31ee9 Mon Sep 17 00:00:00 2001 From: Tristan Colgate-McFarlane Date: Sun, 29 Mar 2015 17:26:03 +0100 Subject: [PATCH] OpenTSDB server interface Add an input collector for the OpenTSDB telnet protcol --- cmd/influxd/config.go | 36 ++++++ cmd/influxd/config_test.go | 19 +++ cmd/influxd/run.go | 26 ++++ cmd/influxd/server_integration_test.go | 157 ++++++++++++++++++++++++ etc/config.sample.toml | 7 ++ opentsdb/opentsdb.go | 160 +++++++++++++++++++++++++ 6 files changed, 405 insertions(+) create mode 100644 opentsdb/opentsdb.go diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 5db74a13bb8..84abe83b99c 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -16,6 +16,7 @@ import ( "github.com/BurntSushi/toml" "github.com/influxdb/influxdb/collectd" "github.com/influxdb/influxdb/graphite" + "github.com/influxdb/influxdb/opentsdb" ) const ( @@ -53,6 +54,9 @@ const ( // DefaultGraphiteDatabaseName is the default Graphite database if none is specified DefaultGraphiteDatabaseName = "graphite" + + // DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified + DefaultOpenTSDBDatabaseName = "opentsdb" ) var DefaultSnapshotURL = url.URL{ @@ -90,6 +94,7 @@ type Config struct { Graphites []Graphite `toml:"graphite"` Collectd Collectd `toml:"collectd"` + OpenTSDB OpenTSDB `toml:"opentsdb"` UDP struct { Enabled bool `toml:"enabled"` @@ -451,3 +456,34 @@ func (g *Graphite) LastEnabled() bool { // maxInt is the largest integer representable by a word (architeture dependent). const maxInt = int64(^uint(0) >> 1) + +type OpenTSDB struct { + Addr string `toml:"address"` + Port int `toml:"port"` + + Enabled bool `toml:"enabled"` + Database string `toml:"database"` + RetentionPolicy string `toml:"retention-policy"` +} + +func (o OpenTSDB) DatabaseString() string { + if o.Database == "" { + return DefaultOpenTSDBDatabaseName + } + return o.Database +} + +func (o OpenTSDB) ListenAddress(defaultBindAddr string) string { + addr := o.Addr + // If no address specified, use default. + if addr == "" { + addr = defaultBindAddr + } + + port := o.Port + // If no port specified, use default. + if port == 0 { + port = opentsdb.DefaultPort + } + return net.JoinHostPort(addr, strconv.Itoa(port)) +} diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index e1be3a5a3d8..1d8162ffeaa 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -121,6 +121,17 @@ func TestParseConfig(t *testing.T) { t.Errorf("collectd typesdb mismatch: expected %v, got %v", "foo-db-type", c.Collectd.TypesDB) } + switch { + case c.OpenTSDB.Enabled != true: + t.Errorf("opentsdb enabled mismatch: expected: %v, got %v", true, c.OpenTSDB.Enabled) + case c.OpenTSDB.ListenAddress(c.BindAddress) != "192.168.0.3:4242": + t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress(c.BindAddress)) + case c.OpenTSDB.DatabaseString() != "opentsdb_database": + t.Errorf("opentsdb database mismatch: expected %v, got %v", "opentsdb_database", c.OpenTSDB.DatabaseString()) + case c.OpenTSDB.RetentionPolicy != "raw": + t.Errorf("collectd retention-policy mismatch: expected %v, got %v", "foo-db-type", c.OpenTSDB.RetentionPolicy) + } + if c.Broker.Port != 8086 { t.Fatalf("broker port mismatch: %v", c.Broker.Port) } else if c.Broker.Dir != "/tmp/influxdb/development/broker" { @@ -241,6 +252,14 @@ port = 25827 database = "collectd_database" typesdb = "foo-db-type" +# Configure OpenTSDB server +[opentsdb] +enabled = true +address = "192.168.0.3" +port = 4242 +database = "opentsdb_database" +retention-policy = "raw" + # Broker configuration [broker] # The broker port should be open between all servers in a cluster. diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 9ffdc54b056..2c2d08044d4 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -19,6 +19,7 @@ import ( "github.com/influxdb/influxdb/graphite" "github.com/influxdb/influxdb/httpd" "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/opentsdb" "github.com/influxdb/influxdb/raft" "github.com/influxdb/influxdb/udp" ) @@ -169,6 +170,31 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser } } + // Spin up any OpenTSDB servers + if config.OpenTSDB.Enabled { + o := config.OpenTSDB + db := o.DatabaseString() + laddr := o.ListenAddress(config.BindAddress) + policy := o.RetentionPolicy + + if err := s.CreateDatabaseIfNotExists(db); err != nil { + log.Fatalf("failed to create database for OpenTSDB server: %s", err.Error()) + } + + if policy != "" { + // Ensure retention policy exists. + rp := influxdb.NewRetentionPolicy(policy) + if err := s.CreateRetentionPolicyIfNotExists(db, rp); err != nil { + log.Fatalf("failed to create retention policy for OpenTSDB: %s", err.Error()) + } + } + + os := opentsdb.NewServer(s, policy, db) + + log.Println("Starting OpenTSDB service on ", laddr) + go os.ListenAndServe(laddr) + } + // Start up self-monitoring if enabled. if config.Monitoring.Enabled { database := monitoringDatabase diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 046056de543..9dd7227b928 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1672,6 +1672,163 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) { } } +func Test_ServerOpenTSDBIntegration(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8790 + testName := "opentsdb integration" + dir := tempfile() + now := time.Now().UTC().Round(time.Second) + c, _ := main.NewConfig() + o := main.OpenTSDB{ + Port: 4242, + Enabled: true, + Database: "opentsdb", + RetentionPolicy: "raw", + } + c.OpenTSDB = o + + t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress)) + nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) + + createDatabase(t, testName, nodes, "opentsdb") + createRetentionPolicy(t, testName, nodes, "opentsdb", "raw") + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) + if err != nil { + t.Fatal(err) + return + } + + t.Log("Writing data") + data := []byte(`put cpu `) + data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...) + data = append(data, []byte(" 10\n")...) + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + return + } + + expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano)) + + // query and wait for results + got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } +} + +func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8791 + testName := "opentsdb integration" + dir := tempfile() + now := time.Now().UTC().Round(time.Second) + c, _ := main.NewConfig() + o := main.OpenTSDB{ + Port: 4243, + Enabled: true, + Database: "opentsdb", + RetentionPolicy: "raw", + } + c.OpenTSDB = o + + t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress)) + nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) + + createDatabase(t, testName, nodes, "opentsdb") + createRetentionPolicy(t, testName, nodes, "opentsdb", "raw") + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) + if err != nil { + t.Fatal(err) + return + } + + t.Log("Writing data") + data := []byte(`put cpu `) + data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...) + data = append(data, []byte(" 10 tag1=val1 tag2=val2\n")...) + data = append(data, `put cpu `...) + data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...) + data = append(data, []byte(" 20 tag1=val3 tag2=val4\n")...) + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + return + } + + expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",20]]}]}]}`, now.Format(time.RFC3339Nano)) + + // query and wait for results + got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } +} + +func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8792 + testName := "opentsdb integration" + dir := tempfile() + now := time.Now().UTC().Round(time.Second) + c, _ := main.NewConfig() + o := main.OpenTSDB{ + Port: 4244, + Enabled: true, + Database: "opentsdb", + RetentionPolicy: "raw", + } + c.OpenTSDB = o + + t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress)) + nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) + + createDatabase(t, testName, nodes, "opentsdb") + createRetentionPolicy(t, testName, nodes, "opentsdb", "raw") + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress)) + if err != nil { + t.Fatal(err) + return + } + + t.Log("Writing data") + data := []byte("This is bad and invalid input\n") + data = append(data, `put cpu `...) + data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...) + data = append(data, []byte(" 10 tag1=val1 tag2=val2\n")...) + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + return + } + + expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano)) + + // query and wait for results + got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } +} + // helper funcs func errToString(err error) string { diff --git a/etc/config.sample.toml b/etc/config.sample.toml index bf31c0cfeaa..aec37fec2d2 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -53,6 +53,13 @@ enabled = false #database = "collectd_database" #typesdb = "types.db" +# Configure the OpenTSDB input. +[opentsdb] +enabled = false +#address = "0.0.0.0" # If not set, is actually set to bind-address. +#port = 4242 +#database = "opentsdb_database" + # Configure UDP listener for series data. [udp] enabled = false diff --git a/opentsdb/opentsdb.go b/opentsdb/opentsdb.go new file mode 100644 index 00000000000..cd4bbac9203 --- /dev/null +++ b/opentsdb/opentsdb.go @@ -0,0 +1,160 @@ +package opentsdb + +import ( + "bufio" + "log" + "net" + "net/textproto" + "strconv" + "strings" + "time" + + "github.com/influxdb/influxdb" +) + +const ( + // DefaultPort represents the default OpenTSDB port. + DefaultPort = 4242 + + // DefaultDatabaseName is the default OpenTSDB database if none is specified + DefaultDatabaseName = "opentsdb" +) + +// SeriesWriter defines the interface for the destination of the data. +type SeriesWriter interface { + WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error) +} + +// An InfluxDB input class to accept OpenTSDB's telnet protocol +// Each telnet command consists of a line of the form: +// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 +type Server struct { + writer SeriesWriter + + database string + retentionpolicy string + + listener *net.TCPListener +} + +func NewServer(w SeriesWriter, retpol string, db string) *Server { + s := &Server{} + + s.writer = w + s.retentionpolicy = retpol + s.database = db + + return s +} + +func (s *Server) ListenAndServe(listenAddress string) { + var err error + + addr, err := net.ResolveTCPAddr("tcp4", listenAddress) + if err != nil { + log.Println("TSDBServer: ResolveTCPAddr: ", err) + return + } + + s.listener, err = net.ListenTCP("tcp", addr) + if err != nil { + log.Println("TSDBServer: Listen: ", err) + return + } + + defer s.listener.Close() + s.HandleListener(s.listener) +} + +func (s *Server) HandleListener(socket *net.TCPListener) { + for { + // Listen for an incoming connection. + conn, err := socket.Accept() + if err != nil { + log.Println("Error accepting: ", err.Error()) + } + // Handle connections in a new goroutine. + go s.HandleConnection(conn) + } +} + +func (s *Server) HandleConnection(conn net.Conn) { + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + + defer conn.Close() + + for { + line, err := tp.ReadLine() + if err != nil { + return + } + + inputStrs := strings.Fields(line) + + if len(inputStrs) == 1 && inputStrs[0] == "version" { + conn.Write([]byte("InfluxDB TSDB proxy")) + continue + } + + if len(inputStrs) < 4 || inputStrs[0] != "put" { + log.Println("TSDBServer: malformed line, skipping: ", line) + continue + } + + name := inputStrs[1] + tsStr := inputStrs[2] + valueStr := inputStrs[3] + tagStrs := inputStrs[4:] + + var t time.Time + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + log.Println("TSDBServer: malformed timestamp, skipping: ", tsStr) + } + + switch len(tsStr) { + case 10: + t = time.Unix(ts, 0) + break + case 13: + t = time.Unix(ts/1000, (ts%1000)*1000) + break + default: + log.Println("TSDBServer: timestamp must be 10 or 13 chars, skipping: ", tsStr) + continue + } + + tags := make(map[string]string) + for t := range tagStrs { + parts := strings.SplitN(tagStrs[t], "=", 2) + if len(parts) != 2 { + log.Println("TSDBServer: malformed tag data", tagStrs[t]) + continue + } + k := parts[0] + + tags[k] = parts[1] + } + + fields := make(map[string]interface{}) + fields[name], err = strconv.ParseFloat(valueStr, 64) + if err != nil { + log.Println("TSDBServer: could not parse value as float: ", valueStr) + continue + } + + p := influxdb.Point{ + Name: name, + Tags: tags, + Timestamp: t, + Fields: fields, + } + + _, err = s.writer.WriteSeries(s.database, s.retentionpolicy, []influxdb.Point{p}) + if err != nil { + log.Println("TSDB cannot write data: ", err) + continue + } + } +}