Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.cratedb): Allow configuration of startup error handling #15065

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions plugins/outputs/cratedb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Configuration for CrateDB to send metrics to.
[[outputs.cratedb]]
# A github.com/jackc/pgx/v4 connection string.
# See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
## Connection parameters for accessing the database see
## https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
## for available options
url = "postgres://user:password@localhost/schema?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s"
# Name of the table to store metrics in.
table = "metrics"
# If true, and the metrics table does not exist, create it automatically.
table_create = true
# The character(s) to replace any '.' in an object key with
key_separator = "_"

## Timeout for all CrateDB queries.
# timeout = "5s"

## Name of the table to store metrics in.
# table = "metrics"

## If true, and the metrics table does not exist, create it automatically.
# table_create = false

## The character(s) to replace any '.' in an object key with
# key_separator = "_"
```
75 changes: 50 additions & 25 deletions plugins/outputs/cratedb/cratedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)

Expand All @@ -25,26 +26,8 @@ var sampleConfig string

const MaxInt64 = int64(^uint64(0) >> 1)

type CrateDB struct {
URL string
Timeout config.Duration
Table string
TableCreate bool `toml:"table_create"`
KeySeparator string `toml:"key_separator"`
DB *sql.DB
}

func (*CrateDB) SampleConfig() string {
return sampleConfig
}

func (c *CrateDB) Connect() error {
db, err := sql.Open("pgx", c.URL)
if err != nil {
return err
} else if c.TableCreate {
query := `
CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
const tableCreationQuery = `
CREATE TABLE IF NOT EXISTS %s (
"hash_id" LONG INDEX OFF,
"timestamp" TIMESTAMP,
"name" STRING,
Expand All @@ -54,13 +37,52 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
PRIMARY KEY ("timestamp", "hash_id","day")
) PARTITIONED BY("day");
`

type CrateDB struct {
URL string `toml:"url"`
Timeout config.Duration `toml:"timeout"`
Table string `toml:"table"`
TableCreate bool `toml:"table_create"`
KeySeparator string `toml:"key_separator"`

db *sql.DB
}

func (*CrateDB) SampleConfig() string {
return sampleConfig
}

func (c *CrateDB) Init() error {
// Set defaults
if c.KeySeparator == "" {
c.KeySeparator = "_"
}
if c.Table == "" {
c.Table = "metrics"
}

return nil
}

func (c *CrateDB) Connect() error {
if c.db == nil {
db, err := sql.Open("pgx", c.URL)
if err != nil {
return err
}
c.db = db
}

if c.TableCreate {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout))
defer cancel()
if _, err := db.ExecContext(ctx, query); err != nil {
return err

query := fmt.Sprintf(tableCreationQuery, c.Table)
if _, err := c.db.ExecContext(ctx, query); err != nil {
return &internal.StartupError{Err: err, Retry: true}
}
}
c.DB = db

return nil
}

Expand All @@ -73,7 +95,7 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error {
return err
}

_, err = c.DB.ExecContext(ctx, generatedSQL)
_, err = c.db.ExecContext(ctx, generatedSQL)
if err != nil {
return err
}
Expand Down Expand Up @@ -225,7 +247,10 @@ func hashID(m telegraf.Metric) int64 {
}

func (c *CrateDB) Close() error {
return c.DB.Close()
if c.db == nil {
return nil
}
return c.db.Close()
}

func init() {
Expand Down
73 changes: 72 additions & 1 deletion plugins/outputs/cratedb/cratedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -46,7 +48,6 @@ func TestConnectAndWriteIntegration(t *testing.T) {
defer container.Terminate()
url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort])

fmt.Println(url)
db, err := sql.Open("pgx", url)
require.NoError(t, err)
defer db.Close()
Expand Down Expand Up @@ -77,6 +78,76 @@ func TestConnectAndWriteIntegration(t *testing.T) {
require.NoError(t, c.Close())
}

func TestConnectionIssueAtStartup(t *testing.T) {
// Test case for https://github.com/influxdata/telegraf/issues/13278
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

container := testutil.Container{
Image: "crate",
ExposedPorts: []string{servicePort},
Entrypoint: []string{
"/docker-entrypoint.sh",
"-Cdiscovery.type=single-node",
},
WaitingFor: wait.ForAll(
wait.ForListeningPort(servicePort),
wait.ForLog("recovered [0] indices into cluster_state"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort])

// Pause the container for connectivity issues
require.NoError(t, container.Pause())

// Create a model to be able to use the startup retry strategy
plugin := &CrateDB{
URL: url,
Table: "testing",
Timeout: config.Duration(time.Second * 5),
TableCreate: true,
}
model := models.NewRunningOutput(
plugin,
&models.OutputConfig{
Name: "cratedb",
StartupErrorBehavior: "retry",
},
1000, 1000,
)
require.NoError(t, model.Init())

// The connect call should succeed even though the table creation was not
// successful due to the "retry" strategy
require.NoError(t, model.Connect())

// Writing the metrics in this state should fail because we are not fully
// started up
metrics := testutil.MockMetrics()
for _, m := range metrics {
model.AddMetric(m)
}
require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected)

// Unpause the container, now writes should succeed
require.NoError(t, container.Resume())
require.NoError(t, model.WriteBatch())
defer model.Close()

// Verify that the metrics were actually written
for _, m := range metrics {
mid := hashID(m)
row := plugin.db.QueryRow("SELECT hash_id FROM testing WHERE hash_id = ? AND timestamp = ?", mid, m.Time())

var id int64
require.NoError(t, row.Scan(&id))
require.Equal(t, id, mid)
}
}

func TestInsertSQL(t *testing.T) {
tests := []struct {
Metrics []telegraf.Metric
Expand Down
25 changes: 15 additions & 10 deletions plugins/outputs/cratedb/sample.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# Configuration for CrateDB to send metrics to.
[[outputs.cratedb]]
# A github.com/jackc/pgx/v4 connection string.
# See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
## Connection parameters for accessing the database see
## https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
## for available options
url = "postgres://user:password@localhost/schema?sslmode=disable"
# Timeout for all CrateDB queries.
timeout = "5s"
# Name of the table to store metrics in.
table = "metrics"
# If true, and the metrics table does not exist, create it automatically.
table_create = true
# The character(s) to replace any '.' in an object key with
key_separator = "_"

## Timeout for all CrateDB queries.
# timeout = "5s"

## Name of the table to store metrics in.
# table = "metrics"

## If true, and the metrics table does not exist, create it automatically.
# table_create = false

## The character(s) to replace any '.' in an object key with
# key_separator = "_"
18 changes: 18 additions & 0 deletions testutil/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,21 @@ func (c *Container) Terminate() {

c.PrintLogs()
}

func (c *Container) Pause() error {
provider, err := testcontainers.NewDockerProvider()
if err != nil {
return fmt.Errorf("getting provider failed: %w", err)
}

return provider.Client().ContainerPause(c.ctx, c.container.GetContainerID())
}

func (c *Container) Resume() error {
provider, err := testcontainers.NewDockerProvider()
if err != nil {
return fmt.Errorf("getting provider failed: %w", err)
}

return provider.Client().ContainerUnpause(c.ctx, c.container.GetContainerID())
}
Loading