Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CrateDB output plugin #3210

Merged
merged 42 commits into from
Nov 9, 2017
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ea6d517
cratedb: basic scaffolding
felixge Aug 5, 2017
bbb45eb
WIP: mostly worked on escaping stuff
felixge Aug 8, 2017
4f46833
Get basic INSERT to work! :)
felixge Aug 10, 2017
4eaa158
Add hash_id field and PRIMARY KEY
felixge Aug 22, 2017
6321c16
smoke test for making sure row was written
felixge Aug 22, 2017
e1a97d0
fix: HashID is uint64 but CrateDB wants int64
felixge Aug 22, 2017
91e10af
remove comment
felixge Aug 22, 2017
4f72f48
no need to cancel our contexts
felixge Aug 22, 2017
f4d159f
Finish sample config
felixge Aug 22, 2017
1393f7e
Whitespace
felixge Aug 22, 2017
50a7665
Initial docs
felixge Aug 22, 2017
3e146c6
use testutil.GetLocalHost()
felixge Aug 22, 2017
ab1a638
docs: wording
felixge Aug 22, 2017
83d0981
add link from main README
felixge Aug 22, 2017
5ccca8b
hash_id INDEX OFF and remove NOT NULL
felixge Sep 2, 2017
6dd8f59
Godeps: add lib/pq
felixge Sep 8, 2017
11b0e39
govet: call cancel() func
felixge Sep 8, 2017
dc71427
make: add cratedb for docker tests
felixge Sep 8, 2017
71516e9
make: disable crate enterprise edition
felixge Sep 8, 2017
86f7002
remove debugging statement
felixge Sep 8, 2017
6b4f4f8
fix: test needs fixed time.Location
felixge Sep 8, 2017
90c909d
add partitioning by day for crate.io output plugin
Sep 14, 2017
a2a651b
fix local dev/test setup
felixge Sep 27, 2017
7fd0308
Whitespace
felixge Sep 27, 2017
a559867
use jackc/pgx
felixge Oct 16, 2017
dc6d3fd
Merge remote-tracking branch 'origin/master' into pgx
felixge Oct 16, 2017
1c6f0e7
Makefile: cleanup cratedb integration
felixge Oct 16, 2017
7d8ed8b
Revert "Godeps: add lib/pq"
felixge Oct 16, 2017
0615712
Godeps: upgrade jackc/pgx
felixge Oct 16, 2017
34e53d9
fix postgres test after upgrading pgx
felixge Oct 23, 2017
3b22f2e
make: remove backslash typo
felixge Nov 2, 2017
6c2df9e
README: remove typo
felixge Nov 2, 2017
a54054f
remove docker-compose.yaml
felixge Nov 2, 2017
e246032
Send timestamps in UTC instead of local time
felixge Nov 2, 2017
c434322
escaping add additional test case
felixge Nov 2, 2017
8b85f0b
escaping: add smoke test
felixge Nov 2, 2017
626f653
Implement new hashID func
felixge Nov 2, 2017
221165d
remove references to lib/pq
felixge Nov 2, 2017
efcbbb2
Merge remote-tracking branch 'origin/master'
felixge Nov 2, 2017
8379cb2
empty commit (trigger ci)
felixge Nov 9, 2017
7a9bfc6
Merge remote-tracking branch 'origin/master'
felixge Nov 9, 2017
b0d6723
trigger ci
felixge Nov 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f
github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea
github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
github.com/jackc/pgx b84338d7d62598f75859b2b146d830b22f1b9ec8
github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917
github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d
github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413
github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893
Expand Down
16 changes: 14 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ docker-run:
-e SLAPD_CONFIG_ROOTPW="secret" \
-p "389:389" -p "636:636" \
-d cobaugh/openldap-alpine
docker run \--name cratedb \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove the backslash here? \--name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-p "6543:5432" \
-d crate crate \
-Cnetwork.host=0.0.0.0 \
-Ctransport.host=localhost \
-Clicense.enterprise=false

# Run docker containers necessary for integration tests; skipping services provided
# by CircleCI
Expand All @@ -110,12 +116,18 @@ docker-run-circle:
-e SLAPD_CONFIG_ROOTPW="secret" \
-p "389:389" -p "636:636" \
-d cobaugh/openldap-alpine
docker run \--name cratedb \
-p "6543:5432" \
-d crate crate \
-Cnetwork.host=0.0.0.0 \
-Ctransport.host=localhost \
-Clicense.enterprise=false

docker-kill:
-docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
openldap postgres rabbitmq redis riemann zookeeper
openldap postgres rabbitmq redis riemann zookeeper cratedb
-docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
openldap postgres rabbitmq redis riemann zookeeper
openldap postgres rabbitmq redis riemann zookeeper cratedb

.PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \
package clean docker-run docker-run-circle docker-kill docker-image
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ formats may be used with input plugins supporting the `data_format` option:
* [amqp](./plugins/outputs/amqp) (rabbitmq)
* [aws kinesis](./plugins/outputs/kinesis)
* [aws cloudwatch](./plugins/outputs/cloudwatch)
* [cratedb](./plugins/outputs/cratedb)
* [datadog](./plugins/outputs/datadog)
* [discard](./plugins/outputs/discard)
* [elasticsearch](./plugins/outputs/elasticsearch)
Expand Down
13 changes: 13 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@
# namespace = "InfluxData/Telegraf"


# # Configuration for CrateDB to send metrics to.
# [[outputs.cratedb]]
# # A lib/pq connection string.
# # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters
# url = "postgres://user:password@localhost/schema?sslmode=disable"
# # Timeout for all CrateDB queries.
# timeout = "5s"
# # Name of the table to store metrics in.
# table = "metrics"
# # If true, and the metrics table does not exist, create it automatically.
# table_create = true


# # Configuration for DataDog API to send metrics to.
# [[outputs.datadog]]
# ## Datadog API key
Expand Down
7 changes: 3 additions & 4 deletions plugins/inputs/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
"checkpoints_req",
"checkpoints_timed",
"maxwritten_clean",
}

int32Metrics := []string{
"datid",
"numbackends",
}

int32Metrics := []string{}

floatMetrics := []string{
"blk_read_time",
"blk_write_time",
Expand All @@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {

stringMetrics := []string{
"datname",
"datid",
}

metricsCounted := 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,19 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
"temp_files",
"temp_bytes",
"deadlocks",
}

int32Metrics := []string{
"numbackends",
"datid",
}

int32Metrics := []string{}

floatMetrics := []string{
"blk_read_time",
"blk_write_time",
}

stringMetrics := []string{
"datname",
"datid",
}

metricsCounted := 0
Expand Down Expand Up @@ -175,20 +174,19 @@ func TestPostgresqlFieldOutput(t *testing.T) {
"temp_files",
"temp_bytes",
"deadlocks",
}

int32Metrics := []string{
"numbackends",
"datid",
}

int32Metrics := []string{}

floatMetrics := []string{
"blk_read_time",
"blk_write_time",
}

stringMetrics := []string{
"datname",
"datid",
}

for _, field := range intMetrics {
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
Expand Down
38 changes: 38 additions & 0 deletions plugins/outputs/cratedb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# CrateDB Output Plugin for Telegraf

This plugin writes to [CrateDB](https://crate.io/) via its [PostgreSQL protocol](https://crate.io/docs/crate/reference/protocols/postgres.html).

## Table Schema

The plugin requires a a table with the following schema.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra a in this sentence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



```sql
CREATE TABLE my_metrics (
"hash_id" LONG INDEX OFF,
"timestamp" TIMESTAMP,
"name" STRING,
"tags" OBJECT(DYNAMIC),
"fields" OBJECT(DYNAMIC),
PRIMARY KEY ("timestamp", "hash_id","day")
) PARTITIONED BY("day");
```

The plugin can create this table for you automatically via the `table_create`
config option, see below.

## Configuration

```toml
# Configuration for CrateDB to send metrics to.
[[outputs.cratedb]]
# A lib/pq connection string.
# See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters
url = "postgres://user:password@localhost/schema?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s"
# Name of the table to store metrics in.
table = "metrics"
# If true, and the metrics table does not exist, create it automatically.
table_create = true
```
202 changes: 202 additions & 0 deletions plugins/outputs/cratedb/cratedb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package cratedb

import (
"context"
"database/sql"
"fmt"
"sort"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/jackc/pgx/stdlib"
)

type CrateDB struct {
URL string
Timeout internal.Duration
Table string
TableCreate bool `toml:"table_create"`
DB *sql.DB
}

var sampleConfig = `
# A lib/pq connection string.
# See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters
url = "postgres://user:password@localhost/schema?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s"
# Name of the table to store metrics in.
table = "metrics"
# If true, and the metrics table does not exist, create it automatically.
table_create = true
`

func (c *CrateDB) Connect() error {
db, err := sql.Open("pgx", c.URL)
if err != nil {
return err
} else if c.TableCreate {
sql := `
CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
"hash_id" LONG INDEX OFF,
"timestamp" TIMESTAMP,
"name" STRING,
"tags" OBJECT(DYNAMIC),
"fields" OBJECT(DYNAMIC),
"day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"),
PRIMARY KEY ("timestamp", "hash_id","day")
) PARTITIONED BY("day");
`
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration)
defer cancel()
if _, err := db.ExecContext(ctx, sql); err != nil {
return err
}
}
c.DB = db
return nil
}

func (c *CrateDB) Write(metrics []telegraf.Metric) error {
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration)
defer cancel()
if sql, err := insertSQL(c.Table, metrics, time.Local); err != nil {
return err
} else if _, err := c.DB.ExecContext(ctx, sql); err != nil {
return err
}
return nil
}

func insertSQL(table string, metrics []telegraf.Metric, loc *time.Location) (string, error) {
rows := make([]string, len(metrics))
for i, m := range metrics {
// Note: We have to convert HashID from uint64 to int64 below because
// CrateDB only supports a signed 64 bit LONG type which would give us
// problems, e.g.:
//
// CREATE TABLE my_long (val LONG);
// INSERT INTO my_long(val) VALUES (14305102049502225714);
// -> ERROR: SQLParseException: For input string: "14305102049502225714"

cols := []interface{}{
int64(m.HashID()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't considered the values of this hash to be part stable across versions, would it be a problem if this hash changed even if the series key (measurement name + tagset) did not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np. I think a cryptographic hash function makes more sense for the use case (deduplication) anyway. Let me know if you think 626f653 works for you.

m.Time().In(loc),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why in local time, what about just sending all times in UTC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CrateDB converts to UTC anyway, but you're right, this actually makes things a bit simpler. e246032

m.Name(),
m.Tags(),
m.Fields(),
}

escapedCols := make([]string, len(cols))
for i, col := range cols {
escaped, err := escapeValue(col)
if err != nil {
return "", err
}
escapedCols[i] = escaped
}
rows[i] = `(` + strings.Join(escapedCols, ", ") + `)`
}
sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields")
VALUES
` + strings.Join(rows, " ,\n") + `;`
return sql, nil
}

// escapeValue returns a string version of val that is suitable for being used
// inside of a VALUES expression or similar. Unsupported types return an error.
//
// Warning: This is not ideal from a security perspective, but unfortunately
// CrateDB does not support enough of the PostgreSQL wire protocol to allow
// using lib/pq with $1, $2 placeholders. Security conscious users of this
// plugin should probably refrain from using it in combination with untrusted
// inputs.
func escapeValue(val interface{}) (string, error) {
switch t := val.(type) {
case string:
return escapeString(t, `'`), nil
// We don't handle uint, uint32 and uint64 here because CrateDB doesn't
// seem to support unsigned types. But it seems like input plugins don't
// produce those types, so it's hopefully ok.
case int, int32, int64, float32, float64:
return fmt.Sprint(t), nil
case time.Time:
// see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp
return escapeValue(t.Format("2006-01-02T15:04:05.999-0700"))
case map[string]string:
return escapeObject(convertMap(t))
case map[string]interface{}:
return escapeObject(t)
default:
// This might be panic worthy under normal circumstances, but it's probably
// better to not shut down the entire telegraf process because of one
// misbehaving plugin.
return "", fmt.Errorf("unexpected type: %T: %#v", t, t)
}
}

// convertMap converts m from map[string]string to map[string]interface{} by
// copying it. Generics, oh generics where art thou?
func convertMap(m map[string]string) map[string]interface{} {
c := make(map[string]interface{}, len(m))
for k, v := range m {
c[k] = v
}
return c
}

func escapeObject(m map[string]interface{}) (string, error) {
// There is a decent chance that the implementation below doesn't catch all
// edge cases, but it's hard to tell since the format seems to be a bit
// underspecified.
// See https://crate.io/docs/crate/reference/sql/data_types.html#object

// We find all keys and sort them first because iterating a map in go is
// randomized and we need consistent output for our unit tests.
keys := make([]string, 0, len(m))
for k, _ := range m {
keys = append(keys, k)
}
sort.Strings(keys)

// Now we build our key = val pairs
pairs := make([]string, 0, len(m))
for _, k := range keys {
// escape the value of our key k (potentially recursive)
val, err := escapeValue(m[k])
if err != nil {
return "", err
}
pairs = append(pairs, escapeString(k, `"`)+" = "+val)
}
return `{` + strings.Join(pairs, ", ") + `}`, nil
}

// escapeString wraps s in the given quote string and replaces all occurences
// of it inside of s with a double quote.
func escapeString(s string, quote string) string {
return quote + strings.Replace(s, quote, quote+quote, -1) + quote
}

func (c *CrateDB) SampleConfig() string {
return sampleConfig
}

func (c *CrateDB) Description() string {
return "Configuration for CrateDB to send metrics to."
}

func (c *CrateDB) Close() error {
return c.DB.Close()
}

func init() {
outputs.Add("cratedb", func() telegraf.Output {
return &CrateDB{
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}
Loading