Skip to content

Commit

Permalink
Merge pull request #870 from tcolgate/feature/opentsdb-input
Browse files Browse the repository at this point in the history
Add a OpenTSDB input protocol
  • Loading branch information
otoolep committed Apr 6, 2015
2 parents 475cdef + 32d6433 commit 7e58f98
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 0 deletions.
36 changes: 36 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/opentsdb"
)

const (
Expand Down Expand Up @@ -53,6 +54,9 @@ const (

// DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite"

// DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified
DefaultOpenTSDBDatabaseName = "opentsdb"
)

var DefaultSnapshotURL = url.URL{
Expand Down Expand Up @@ -90,6 +94,7 @@ type Config struct {

Graphites []Graphite `toml:"graphite"`
Collectd Collectd `toml:"collectd"`
OpenTSDB OpenTSDB `toml:"opentsdb"`

UDP struct {
Enabled bool `toml:"enabled"`
Expand Down Expand Up @@ -451,3 +456,34 @@ func (g *Graphite) LastEnabled() bool {

// maxInt is the largest integer representable by a word (architeture dependent).
const maxInt = int64(^uint(0) >> 1)

type OpenTSDB struct {
Addr string `toml:"address"`
Port int `toml:"port"`

Enabled bool `toml:"enabled"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
}

func (o OpenTSDB) DatabaseString() string {
if o.Database == "" {
return DefaultOpenTSDBDatabaseName
}
return o.Database
}

func (o OpenTSDB) ListenAddress(defaultBindAddr string) string {
addr := o.Addr
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
}

port := o.Port
// If no port specified, use default.
if port == 0 {
port = opentsdb.DefaultPort
}
return net.JoinHostPort(addr, strconv.Itoa(port))
}
19 changes: 19 additions & 0 deletions cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ func TestParseConfig(t *testing.T) {
t.Errorf("collectd typesdb mismatch: expected %v, got %v", "foo-db-type", c.Collectd.TypesDB)
}

switch {
case c.OpenTSDB.Enabled != true:
t.Errorf("opentsdb enabled mismatch: expected: %v, got %v", true, c.OpenTSDB.Enabled)
case c.OpenTSDB.ListenAddress(c.BindAddress) != "192.168.0.3:4242":
t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress(c.BindAddress))
case c.OpenTSDB.DatabaseString() != "opentsdb_database":
t.Errorf("opentsdb database mismatch: expected %v, got %v", "opentsdb_database", c.OpenTSDB.DatabaseString())
case c.OpenTSDB.RetentionPolicy != "raw":
t.Errorf("collectd retention-policy mismatch: expected %v, got %v", "foo-db-type", c.OpenTSDB.RetentionPolicy)
}

if c.Broker.Port != 8086 {
t.Fatalf("broker port mismatch: %v", c.Broker.Port)
} else if c.Broker.Dir != "/tmp/influxdb/development/broker" {
Expand Down Expand Up @@ -241,6 +252,14 @@ port = 25827
database = "collectd_database"
typesdb = "foo-db-type"
# Configure OpenTSDB server
[opentsdb]
enabled = true
address = "192.168.0.3"
port = 4242
database = "opentsdb_database"
retention-policy = "raw"
# Broker configuration
[broker]
# The broker port should be open between all servers in a cluster.
Expand Down
26 changes: 26 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/opentsdb"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/udp"
)
Expand Down Expand Up @@ -169,6 +170,31 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser
}
}

// Spin up any OpenTSDB servers
if config.OpenTSDB.Enabled {
o := config.OpenTSDB
db := o.DatabaseString()
laddr := o.ListenAddress(config.BindAddress)
policy := o.RetentionPolicy

if err := s.CreateDatabaseIfNotExists(db); err != nil {
log.Fatalf("failed to create database for OpenTSDB server: %s", err.Error())
}

if policy != "" {
// Ensure retention policy exists.
rp := influxdb.NewRetentionPolicy(policy)
if err := s.CreateRetentionPolicyIfNotExists(db, rp); err != nil {
log.Fatalf("failed to create retention policy for OpenTSDB: %s", err.Error())
}
}

os := opentsdb.NewServer(s, policy, db)

log.Println("Starting OpenTSDB service on ", laddr)
go os.ListenAndServe(laddr)
}

// Start up self-monitoring if enabled.
if config.Monitoring.Enabled {
database := monitoringDatabase
Expand Down
157 changes: 157 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,163 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
}
}

func Test_ServerOpenTSDBIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8790
testName := "opentsdb integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
o := main.OpenTSDB{
Port: 4242,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o

t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte(`put cpu `)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 10\n")...)
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8791
testName := "opentsdb integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
o := main.OpenTSDB{
Port: 4243,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o

t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte(`put cpu `)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 10 tag1=val1 tag2=val2\n")...)
data = append(data, `put cpu `...)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 20 tag1=val3 tag2=val4\n")...)
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",20]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8792
testName := "opentsdb integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
o := main.OpenTSDB{
Port: 4244,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o

t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte("This is bad and invalid input\n")
data = append(data, `put cpu `...)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 10 tag1=val1 tag2=val2\n")...)
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

// helper funcs

func errToString(err error) string {
Expand Down
7 changes: 7 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ enabled = false
#database = "collectd_database"
#typesdb = "types.db"

# Configure the OpenTSDB input.
[opentsdb]
enabled = false
#address = "0.0.0.0" # If not set, is actually set to bind-address.
#port = 4242
#database = "opentsdb_database"

# Configure UDP listener for series data.
[udp]
enabled = false
Expand Down
Loading

0 comments on commit 7e58f98

Please sign in to comment.