Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a OpenTSDB input protocol #870

Merged
merged 1 commit into from
Apr 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/opentsdb"
)

const (
Expand Down Expand Up @@ -53,6 +54,9 @@ const (

// DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite"

// DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified
DefaultOpenTSDBDatabaseName = "opentsdb"
)

var DefaultSnapshotURL = url.URL{
Expand Down Expand Up @@ -90,6 +94,7 @@ type Config struct {

Graphites []Graphite `toml:"graphite"`
Collectd Collectd `toml:"collectd"`
OpenTSDB OpenTSDB `toml:"opentsdb"`

UDP struct {
Enabled bool `toml:"enabled"`
Expand Down Expand Up @@ -451,3 +456,34 @@ func (g *Graphite) LastEnabled() bool {

// maxInt is the largest integer representable by a word (architeture dependent).
const maxInt = int64(^uint(0) >> 1)

type OpenTSDB struct {
Addr string `toml:"address"`
Port int `toml:"port"`

Enabled bool `toml:"enabled"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a toml directive here, just so it's clear what it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not sure what you mean here. Each line already has the toml struct tag, and the entry in the Config struct does too (line 97)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is what I meant, I see you've got the toml directive in the right place after all -- thanks.


func (o OpenTSDB) DatabaseString() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a unit test for this, just to be sure. Check out config_test.go for examples on how to do this.

if o.Database == "" {
return DefaultOpenTSDBDatabaseName
}
return o.Database
}

func (o OpenTSDB) ListenAddress(defaultBindAddr string) string {
addr := o.Addr
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
}

port := o.Port
// If no port specified, use default.
if port == 0 {
port = opentsdb.DefaultPort
}
return net.JoinHostPort(addr, strconv.Itoa(port))
}
19 changes: 19 additions & 0 deletions cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ func TestParseConfig(t *testing.T) {
t.Errorf("collectd typesdb mismatch: expected %v, got %v", "foo-db-type", c.Collectd.TypesDB)
}

switch {
case c.OpenTSDB.Enabled != true:
t.Errorf("opentsdb enabled mismatch: expected: %v, got %v", true, c.OpenTSDB.Enabled)
case c.OpenTSDB.ListenAddress(c.BindAddress) != "192.168.0.3:4242":
t.Errorf("opentsdb listen address mismatch: expected %v, got %v", "192.168.0.3:4242", c.OpenTSDB.ListenAddress(c.BindAddress))
case c.OpenTSDB.DatabaseString() != "opentsdb_database":
t.Errorf("opentsdb database mismatch: expected %v, got %v", "opentsdb_database", c.OpenTSDB.DatabaseString())
case c.OpenTSDB.RetentionPolicy != "raw":
t.Errorf("collectd retention-policy mismatch: expected %v, got %v", "foo-db-type", c.OpenTSDB.RetentionPolicy)
}

if c.Broker.Port != 8086 {
t.Fatalf("broker port mismatch: %v", c.Broker.Port)
} else if c.Broker.Dir != "/tmp/influxdb/development/broker" {
Expand Down Expand Up @@ -241,6 +252,14 @@ port = 25827
database = "collectd_database"
typesdb = "foo-db-type"

# Configure OpenTSDB server
[opentsdb]
enabled = true
address = "192.168.0.3"
port = 4242
database = "opentsdb_database"
retention-policy = "raw"

# Broker configuration
[broker]
# The broker port should be open between all servers in a cluster.
Expand Down
26 changes: 26 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/opentsdb"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/udp"
)
Expand Down Expand Up @@ -169,6 +170,31 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser
}
}

// Spin up any OpenTSDB servers
if config.OpenTSDB.Enabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must do this inside the block above, since data ingestion only takes place on nodes which have a "server" object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@otoolep sorry, not sollowing. This is at the same level as all the other inputs,and inside the if s != nil. If it needs moving, you'll need to be more explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake @tcolgate -- I must have misinterpreted the indentation. Looks good -- thanks.

o := config.OpenTSDB
db := o.DatabaseString()
laddr := o.ListenAddress(config.BindAddress)
policy := o.RetentionPolicy

if err := s.CreateDatabaseIfNotExists(db); err != nil {
log.Fatalf("failed to create database for OpenTSDB server: %s", err.Error())
}

if policy != "" {
// Ensure retention policy exists.
rp := influxdb.NewRetentionPolicy(policy)
if err := s.CreateRetentionPolicyIfNotExists(db, rp); err != nil {
log.Fatalf("failed to create retention policy for OpenTSDB: %s", err.Error())
}
}

os := opentsdb.NewServer(s, policy, db)

log.Println("Starting OpenTSDB service on ", laddr)
go os.ListenAndServe(laddr)
}

// Start up self-monitoring if enabled.
if config.Monitoring.Enabled {
database := monitoringDatabase
Expand Down
157 changes: 157 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,163 @@ func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) {
}
}

func Test_ServerOpenTSDBIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8790
testName := "opentsdb integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
o := main.OpenTSDB{
Port: 4242,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o

t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte(`put cpu `)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 10\n")...)
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, 2*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a test to query by tag value, and ensure you get the expected data back. Make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some more test data? To try some other cases? Write a line without any tags, and write some malformed lines, to make sure the system doesn't go down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for malformed data, and a tests that adds data without tags. Mad the test with tags query the tag value of one of two data points.

if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8791
testName := "opentsdb integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
o := main.OpenTSDB{
Port: 4243,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o

t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte(`put cpu `)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 10 tag1=val1 tag2=val2\n")...)
data = append(data, `put cpu `...)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 20 tag1=val3 tag2=val4\n")...)
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",20]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu where tag1='val3'`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8792
testName := "opentsdb integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewConfig()
o := main.OpenTSDB{
Port: 4244,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o

t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte("This is bad and invalid input\n")
data = append(data, `put cpu `...)
data = append(data, []byte(fmt.Sprintf("%d", now.Unix()))...)
data = append(data, []byte(" 10 tag1=val1 tag2=val2\n")...)
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

// helper funcs

func errToString(err error) string {
Expand Down
7 changes: 7 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ enabled = false
#database = "collectd_database"
#typesdb = "types.db"

# Configure the OpenTSDB input.
[opentsdb]
enabled = false
#address = "0.0.0.0" # If not set, is actually set to bind-address.
#port = 4242
#database = "opentsdb_database"

# Configure UDP listener for series data.
[udp]
enabled = false
Expand Down
Loading