Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run create database query once per database #7333

Merged
merged 2 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ type HTTPConfig struct {
}

type httpClient struct {
client *http.Client
config HTTPConfig
createdDatabases map[string]bool
client *http.Client
config HTTPConfig
createDatabaseExecuted map[string]bool

log telegraf.Logger
}
Expand Down Expand Up @@ -177,9 +177,9 @@ func NewHTTPClient(config HTTPConfig) (*httpClient, error) {
Timeout: config.Timeout,
Transport: transport,
},
createdDatabases: make(map[string]bool),
config: config,
log: config.Log,
createDatabaseExecuted: make(map[string]bool),
config: config,
log: config.Log,
}
return client, nil
}
Expand Down Expand Up @@ -209,13 +209,19 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
}
defer resp.Body.Close()

// Track that a 'create database` statement was executed for the database
// even if the statement fails. One initial attempt to create the database
// is made each time a new database is encountered in the database_tag.
// After this initial attempt it will only be executed if a "database not
// found" error occurs.
c.createDatabaseExecuted[database] = true
ssoroka marked this conversation as resolved.
Show resolved Hide resolved

queryResp := &QueryResponse{}
dec := json.NewDecoder(resp.Body)
err = dec.Decode(queryResp)

if err != nil {
if resp.StatusCode == 200 {
c.createdDatabases[database] = true
return nil
}

Expand All @@ -227,7 +233,6 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error

// Even with a 200 response there can be an error
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
c.createdDatabases[database] = true
return nil
}

Expand Down Expand Up @@ -280,7 +285,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

for dbrp, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] {
if !c.config.SkipDatabaseCreation && !c.createDatabaseExecuted[dbrp.Database] {
err := c.CreateDatabase(ctx, dbrp.Database)
if err != nil {
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
Expand Down
170 changes: 170 additions & 0 deletions plugins/outputs/influxdb/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,3 +930,173 @@ func TestDBRPTags(t *testing.T) {
})
}
}

type MockHandlerChain struct {
handlers []http.HandlerFunc
}

func (h *MockHandlerChain) ServeHTTP(w http.ResponseWriter, r *http.Request) {
next, rest := h.handlers[0], h.handlers[1:]
h.handlers = rest
next(w, r)
}

func (h *MockHandlerChain) Done() bool {
return len(h.handlers) == 0
}

func TestDBRPTagsCreateDatabaseNotCalledOnRetry(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

handlers := &MockHandlerChain{
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
},
}
ts.Config.Handler = handlers

metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
}

output := influxdb.InfluxDB{
URL: u.String(),
Database: "telegraf",
DatabaseTag: "database",
Log: testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
}
err = output.Connect()
require.NoError(t, err)
err = output.Write(metrics)
require.NoError(t, err)
err = output.Write(metrics)
require.NoError(t, err)

require.True(t, handlers.Done(), "all handlers not called")
}

func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

handlers := &MockHandlerChain{
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`{"error": "database not found: \"telegraf\""}`))
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
},
}
ts.Config.Handler = handlers

metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
}

output := influxdb.InfluxDB{
URL: u.String(),
Database: "telegraf",
DatabaseTag: "database",
Log: testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
}

err = output.Connect()
require.NoError(t, err)
err = output.Write(metrics)
require.Error(t, err)
err = output.Write(metrics)
require.NoError(t, err)

require.True(t, handlers.Done(), "all handlers not called")
}