From 1e6896f15ad8c9cd6f63c06600a375f0ad9c24ef Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 14 Apr 2020 09:41:25 -0700 Subject: [PATCH 1/2] Run create database query once per database Additional create database queries only ran if a database not found error is received. --- plugins/outputs/influxdb/http.go | 23 ++-- plugins/outputs/influxdb/http_test.go | 170 ++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 9 deletions(-) diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index b663d9198f42d..a12932c2a8c53 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -107,9 +107,9 @@ type HTTPConfig struct { } type httpClient struct { - client *http.Client - config HTTPConfig - createdDatabases map[string]bool + client *http.Client + config HTTPConfig + createDatabaseExecuted map[string]bool log telegraf.Logger } @@ -177,9 +177,9 @@ func NewHTTPClient(config HTTPConfig) (*httpClient, error) { Timeout: config.Timeout, Transport: transport, }, - createdDatabases: make(map[string]bool), - config: config, - log: config.Log, + createDatabaseExecuted: make(map[string]bool), + config: config, + log: config.Log, } return client, nil } @@ -209,13 +209,19 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error } defer resp.Body.Close() + // Track that a 'create database` statement was executed for the database + // even if the statement fails. One initial attempt to create the database + // is made each time a new database is encountered in the database_tag. + // After this initial attempt it will only be executed if a "database not + // found" error occurs. + c.createDatabaseExecuted[database] = true + queryResp := &QueryResponse{} dec := json.NewDecoder(resp.Body) err = dec.Decode(queryResp) if err != nil { if resp.StatusCode == 200 { - c.createdDatabases[database] = true return nil } @@ -227,7 +233,6 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error // Even with a 200 response there can be an error if resp.StatusCode == http.StatusOK && queryResp.Error() == "" { - c.createdDatabases[database] = true return nil } @@ -280,7 +285,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } for dbrp, batch := range batches { - if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] { + if !c.config.SkipDatabaseCreation && !c.createDatabaseExecuted[dbrp.Database] { err := c.CreateDatabase(ctx, dbrp.Database) if err != nil { c.log.Warnf("When writing to [%s]: database %q creation failed: %v", diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index 3f5ef0bc61830..503d04f307c04 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -930,3 +930,173 @@ func TestDBRPTags(t *testing.T) { }) } } + +type MockHandlerChain struct { + handlers []http.HandlerFunc +} + +func (h *MockHandlerChain) ServeHTTP(w http.ResponseWriter, r *http.Request) { + next, rest := h.handlers[0], h.handlers[1:] + h.handlers = rest + next(w, r) +} + +func (h *MockHandlerChain) Done() bool { + return len(h.handlers) == 0 +} + +func TestDBRPTagsCreateDatabaseNotCalledOnRetry(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + handlers := &MockHandlerChain{ + handlers: []http.HandlerFunc{ + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/query": + if r.FormValue("q") != `CREATE DATABASE "telegraf"` { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusForbidden) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + }, + } + ts.Config.Handler = handlers + + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + } + + output := influxdb.InfluxDB{ + URL: u.String(), + Database: "telegraf", + DatabaseTag: "database", + Log: testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + } + err = output.Connect() + require.NoError(t, err) + err = output.Write(metrics) + require.NoError(t, err) + err = output.Write(metrics) + require.NoError(t, err) + + require.True(t, handlers.Done(), "all handlers not called") +} + +func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + handlers := &MockHandlerChain{ + handlers: []http.HandlerFunc{ + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/query": + if r.FormValue("q") != `CREATE DATABASE "telegraf"` { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusForbidden) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(`{"error": "database not found: \"telegraf\""}`)) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/query": + if r.FormValue("q") != `CREATE DATABASE "telegraf"` { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusForbidden) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusInternalServerError) + } + }, + }, + } + ts.Config.Handler = handlers + + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + } + + output := influxdb.InfluxDB{ + URL: u.String(), + Database: "telegraf", + DatabaseTag: "database", + Log: testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + } + + err = output.Connect() + require.NoError(t, err) + err = output.Write(metrics) + require.Error(t, err) + err = output.Write(metrics) + require.NoError(t, err) + + require.True(t, handlers.Done(), "all handlers not called") +} From 306faf42dcb5c63b70ca329679d96a246d64a36d Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 28 Apr 2020 12:03:14 -0700 Subject: [PATCH 2/2] Retry create database only on success and forbidden --- plugins/outputs/influxdb/http.go | 25 +++++++++++++++---------- plugins/outputs/influxdb/http_test.go | 8 +++++++- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index a12932c2a8c53..bad1ae065c4ac 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -107,8 +107,12 @@ type HTTPConfig struct { } type httpClient struct { - client *http.Client - config HTTPConfig + client *http.Client + config HTTPConfig + // Tracks that the 'create database` statement was executed for the + // database. An attempt to create the database is made each time a new + // database is encountered in the database_tag and after a "database not + // found" error occurs. createDatabaseExecuted map[string]bool log telegraf.Logger @@ -209,13 +213,6 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error } defer resp.Body.Close() - // Track that a 'create database` statement was executed for the database - // even if the statement fails. One initial attempt to create the database - // is made each time a new database is encountered in the database_tag. - // After this initial attempt it will only be executed if a "database not - // found" error occurs. - c.createDatabaseExecuted[database] = true - queryResp := &QueryResponse{} dec := json.NewDecoder(resp.Body) err = dec.Decode(queryResp) @@ -231,11 +228,19 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error } } - // Even with a 200 response there can be an error + // Even with a 200 status code there can be an error in the response body. + // If there is also no error string then the operation was successful. if resp.StatusCode == http.StatusOK && queryResp.Error() == "" { + c.createDatabaseExecuted[database] = true return nil } + // Don't attempt to recreate the database after a 403 Forbidden error. + // This behavior exists only to maintain backwards compatiblity. + if resp.StatusCode == http.StatusForbidden { + c.createDatabaseExecuted[database] = true + } + return &APIError{ StatusCode: resp.StatusCode, Title: resp.Status, diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index 503d04f307c04..25d23e60f5ed4 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -936,6 +936,10 @@ type MockHandlerChain struct { } func (h *MockHandlerChain) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if len(h.handlers) == 0 { + w.WriteHeader(http.StatusInternalServerError) + return + } next, rest := h.handlers[0], h.handlers[1:] h.handlers = rest next(w, r) @@ -945,7 +949,7 @@ func (h *MockHandlerChain) Done() bool { return len(h.handlers) == 0 } -func TestDBRPTagsCreateDatabaseNotCalledOnRetry(t *testing.T) { +func TestDBRPTagsCreateDatabaseNotCalledOnRetryAfterForbidden(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() @@ -962,6 +966,7 @@ func TestDBRPTagsCreateDatabaseNotCalledOnRetry(t *testing.T) { return } w.WriteHeader(http.StatusForbidden) + w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`)) default: w.WriteHeader(http.StatusInternalServerError) } @@ -1033,6 +1038,7 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) { return } w.WriteHeader(http.StatusForbidden) + w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`)) default: w.WriteHeader(http.StatusInternalServerError) }