diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index 18b3bbb495293..03ad297b4330c 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -36,15 +36,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Configuration for CrateDB to send metrics to. [[outputs.cratedb]] - # A github.com/jackc/pgx/v4 connection string. - # See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig + ## Connection parameters for accessing the database see + ## https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig + ## for available options url = "postgres://user:password@localhost/schema?sslmode=disable" - # Timeout for all CrateDB queries. - timeout = "5s" - # Name of the table to store metrics in. - table = "metrics" - # If true, and the metrics table does not exist, create it automatically. - table_create = true - # The character(s) to replace any '.' in an object key with - key_separator = "_" + + ## Timeout for all CrateDB queries. + # timeout = "5s" + + ## Name of the table to store metrics in. + # table = "metrics" + + ## If true, and the metrics table does not exist, create it automatically. + # table_create = false + + ## The character(s) to replace any '.' in an object key with + # key_separator = "_" ``` diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 9cd9a593fedd8..284feb4289f72 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" ) @@ -25,26 +26,8 @@ var sampleConfig string const MaxInt64 = int64(^uint64(0) >> 1) -type CrateDB struct { - URL string - Timeout config.Duration - Table string - TableCreate bool `toml:"table_create"` - KeySeparator string `toml:"key_separator"` - DB *sql.DB -} - -func (*CrateDB) SampleConfig() string { - return sampleConfig -} - -func (c *CrateDB) Connect() error { - db, err := sql.Open("pgx", c.URL) - if err != nil { - return err - } else if c.TableCreate { - query := ` -CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( +const tableCreationQuery = ` +CREATE TABLE IF NOT EXISTS %s ( "hash_id" LONG INDEX OFF, "timestamp" TIMESTAMP, "name" STRING, @@ -54,13 +37,52 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( PRIMARY KEY ("timestamp", "hash_id","day") ) PARTITIONED BY("day"); ` + +type CrateDB struct { + URL string `toml:"url"` + Timeout config.Duration `toml:"timeout"` + Table string `toml:"table"` + TableCreate bool `toml:"table_create"` + KeySeparator string `toml:"key_separator"` + + db *sql.DB +} + +func (*CrateDB) SampleConfig() string { + return sampleConfig +} + +func (c *CrateDB) Init() error { + // Set defaults + if c.KeySeparator == "" { + c.KeySeparator = "_" + } + if c.Table == "" { + c.Table = "metrics" + } + + return nil +} + +func (c *CrateDB) Connect() error { + if c.db == nil { + db, err := sql.Open("pgx", c.URL) + if err != nil { + return err + } + c.db = db + } + + if c.TableCreate { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)) defer cancel() - if _, err := db.ExecContext(ctx, query); err != nil { - return err + + query := fmt.Sprintf(tableCreationQuery, c.Table) + if _, err := c.db.ExecContext(ctx, query); err != nil { + return &internal.StartupError{Err: err, Retry: true} } } - c.DB = db + return nil } @@ -73,7 +95,7 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { return err } - _, err = c.DB.ExecContext(ctx, generatedSQL) + _, err = c.db.ExecContext(ctx, generatedSQL) if err != nil { return err } @@ -225,7 +247,10 @@ func hashID(m telegraf.Metric) int64 { } func (c *CrateDB) Close() error { - return c.DB.Close() + if c.db == nil { + return nil + } + return c.db.Close() } func init() { diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index d1fdf1688308d..bba8ed72f349e 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -12,7 +12,9 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/testutil" ) @@ -46,7 +48,6 @@ func TestConnectAndWriteIntegration(t *testing.T) { defer container.Terminate() url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort]) - fmt.Println(url) db, err := sql.Open("pgx", url) require.NoError(t, err) defer db.Close() @@ -77,6 +78,76 @@ func TestConnectAndWriteIntegration(t *testing.T) { require.NoError(t, c.Close()) } +func TestConnectionIssueAtStartup(t *testing.T) { + // Test case for https://github.com/influxdata/telegraf/issues/13278 + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := testutil.Container{ + Image: "crate", + ExposedPorts: []string{servicePort}, + Entrypoint: []string{ + "/docker-entrypoint.sh", + "-Cdiscovery.type=single-node", + }, + WaitingFor: wait.ForAll( + wait.ForListeningPort(servicePort), + wait.ForLog("recovered [0] indices into cluster_state"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort]) + + // Pause the container for connectivity issues + require.NoError(t, container.Pause()) + + // Create a model to be able to use the startup retry strategy + plugin := &CrateDB{ + URL: url, + Table: "testing", + Timeout: config.Duration(time.Second * 5), + TableCreate: true, + } + model := models.NewRunningOutput( + plugin, + &models.OutputConfig{ + Name: "cratedb", + StartupErrorBehavior: "retry", + }, + 1000, 1000, + ) + require.NoError(t, model.Init()) + + // The connect call should succeed even though the table creation was not + // successful due to the "retry" strategy + require.NoError(t, model.Connect()) + + // Writing the metrics in this state should fail because we are not fully + // started up + metrics := testutil.MockMetrics() + for _, m := range metrics { + model.AddMetric(m) + } + require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected) + + // Unpause the container, now writes should succeed + require.NoError(t, container.Resume()) + require.NoError(t, model.WriteBatch()) + defer model.Close() + + // Verify that the metrics were actually written + for _, m := range metrics { + mid := hashID(m) + row := plugin.db.QueryRow("SELECT hash_id FROM testing WHERE hash_id = ? AND timestamp = ?", mid, m.Time()) + + var id int64 + require.NoError(t, row.Scan(&id)) + require.Equal(t, id, mid) + } +} + func TestInsertSQL(t *testing.T) { tests := []struct { Metrics []telegraf.Metric diff --git a/plugins/outputs/cratedb/sample.conf b/plugins/outputs/cratedb/sample.conf index f547ec897d5c5..ef30ab1c9f790 100644 --- a/plugins/outputs/cratedb/sample.conf +++ b/plugins/outputs/cratedb/sample.conf @@ -1,13 +1,18 @@ # Configuration for CrateDB to send metrics to. [[outputs.cratedb]] - # A github.com/jackc/pgx/v4 connection string. - # See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig + ## Connection parameters for accessing the database see + ## https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig + ## for available options url = "postgres://user:password@localhost/schema?sslmode=disable" - # Timeout for all CrateDB queries. - timeout = "5s" - # Name of the table to store metrics in. - table = "metrics" - # If true, and the metrics table does not exist, create it automatically. - table_create = true - # The character(s) to replace any '.' in an object key with - key_separator = "_" + + ## Timeout for all CrateDB queries. + # timeout = "5s" + + ## Name of the table to store metrics in. + # table = "metrics" + + ## If true, and the metrics table does not exist, create it automatically. + # table_create = false + + ## The character(s) to replace any '.' in an object key with + # key_separator = "_" diff --git a/testutil/container.go b/testutil/container.go index a4283ed07dc5a..7c51cbde1af89 100644 --- a/testutil/container.go +++ b/testutil/container.go @@ -156,3 +156,21 @@ func (c *Container) Terminate() { c.PrintLogs() } + +func (c *Container) Pause() error { + provider, err := testcontainers.NewDockerProvider() + if err != nil { + return fmt.Errorf("getting provider failed: %w", err) + } + + return provider.Client().ContainerPause(c.ctx, c.container.GetContainerID()) +} + +func (c *Container) Resume() error { + provider, err := testcontainers.NewDockerProvider() + if err != nil { + return fmt.Errorf("getting provider failed: %w", err) + } + + return provider.Client().ContainerUnpause(c.ctx, c.container.GetContainerID()) +}