Skip to content

Commit

Permalink
Run create database query once per database (influxdata#7333)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent f42af67 commit 796b920
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 10 deletions.
30 changes: 20 additions & 10 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,13 @@ type HTTPConfig struct {
}

type httpClient struct {
client *http.Client
config HTTPConfig
createdDatabases map[string]bool
client *http.Client
config HTTPConfig
// Tracks that the 'create database` statement was executed for the
// database. An attempt to create the database is made each time a new
// database is encountered in the database_tag and after a "database not
// found" error occurs.
createDatabaseExecuted map[string]bool

log telegraf.Logger
}
Expand Down Expand Up @@ -177,9 +181,9 @@ func NewHTTPClient(config HTTPConfig) (*httpClient, error) {
Timeout: config.Timeout,
Transport: transport,
},
createdDatabases: make(map[string]bool),
config: config,
log: config.Log,
createDatabaseExecuted: make(map[string]bool),
config: config,
log: config.Log,
}
return client, nil
}
Expand Down Expand Up @@ -215,7 +219,6 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error

if err != nil {
if resp.StatusCode == 200 {
c.createdDatabases[database] = true
return nil
}

Expand All @@ -225,12 +228,19 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
}
}

// Even with a 200 response there can be an error
// Even with a 200 status code there can be an error in the response body.
// If there is also no error string then the operation was successful.
if resp.StatusCode == http.StatusOK && queryResp.Error() == "" {
c.createdDatabases[database] = true
c.createDatabaseExecuted[database] = true
return nil
}

// Don't attempt to recreate the database after a 403 Forbidden error.
// This behavior exists only to maintain backwards compatiblity.
if resp.StatusCode == http.StatusForbidden {
c.createDatabaseExecuted[database] = true
}

return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Expand Down Expand Up @@ -284,7 +294,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

for dbrp, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] {
if !c.config.SkipDatabaseCreation && !c.createDatabaseExecuted[dbrp.Database] {
err := c.CreateDatabase(ctx, dbrp.Database)
if err != nil {
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
Expand Down
176 changes: 176 additions & 0 deletions plugins/outputs/influxdb/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,3 +959,179 @@ func TestDBRPTags(t *testing.T) {
})
}
}

type MockHandlerChain struct {
handlers []http.HandlerFunc
}

func (h *MockHandlerChain) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if len(h.handlers) == 0 {
w.WriteHeader(http.StatusInternalServerError)
return
}
next, rest := h.handlers[0], h.handlers[1:]
h.handlers = rest
next(w, r)
}

func (h *MockHandlerChain) Done() bool {
return len(h.handlers) == 0
}

func TestDBRPTagsCreateDatabaseNotCalledOnRetryAfterForbidden(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

handlers := &MockHandlerChain{
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`))
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
},
}
ts.Config.Handler = handlers

metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
}

output := influxdb.InfluxDB{
URL: u.String(),
Database: "telegraf",
DatabaseTag: "database",
Log: testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
}
err = output.Connect()
require.NoError(t, err)
err = output.Write(metrics)
require.NoError(t, err)
err = output.Write(metrics)
require.NoError(t, err)

require.True(t, handlers.Done(), "all handlers not called")
}

func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

handlers := &MockHandlerChain{
handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`))
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`{"error": "database not found: \"telegraf\""}`))
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
},
}
ts.Config.Handler = handlers

metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
}

output := influxdb.InfluxDB{
URL: u.String(),
Database: "telegraf",
DatabaseTag: "database",
Log: testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
}

err = output.Connect()
require.NoError(t, err)
err = output.Write(metrics)
require.Error(t, err)
err = output.Write(metrics)
require.NoError(t, err)

require.True(t, handlers.Done(), "all handlers not called")
}

0 comments on commit 796b920

Please sign in to comment.