From 40b50b683f03f0ecb569fb99d63ebf0e95b5dc1e Mon Sep 17 00:00:00 2001 From: Mahendra Paipuri Date: Sat, 18 Jan 2025 12:18:33 +0100 Subject: [PATCH] fix: TSDB retention period estimation * When TSDB is configured with only retention period, we can consider it as "actual" retention period. When it is configured with either retention size or retention period and size, we need to verify that actual retention period by making an API request to `up` query. * Minor refactoring on how we estimate the retention period. * Refactor the frontend loadBalancer initialization and move all middleware related logic into middleware file. * Add unit tests to test all different cases Signed-off-by: Mahendra Paipuri --- pkg/lb/backend/tsdb.go | 41 +++++++++++++------- pkg/lb/backend/tsdb_test.go | 52 ++++++++++++++++++++++++- pkg/lb/frontend/frontend.go | 70 ++------------------------------- pkg/lb/frontend/middleware.go | 73 +++++++++++++++++++++++++++++++++++ 4 files changed, 153 insertions(+), 83 deletions(-) diff --git a/pkg/lb/backend/tsdb.go b/pkg/lb/backend/tsdb.go index e274e8a..4f9adcd 100644 --- a/pkg/lb/backend/tsdb.go +++ b/pkg/lb/backend/tsdb.go @@ -190,30 +190,29 @@ func (b *tsdbServer) fetchRetentionPeriod() (time.Duration, error) { return time.Duration(period), ErrTypeAssertion } + // If storageRetention is set to duration ONLY, we can consider it as + // retention period + if period, err = model.ParseDuration(strings.TrimSpace(vString)); err == nil { + return time.Duration(period), nil + } + + // If storageRetention is set to size or time and size, we need to get + // "actual" retention period for _, retentionString := range strings.Split(vString, "or") { period, err = model.ParseDuration(strings.TrimSpace(retentionString)) if err != nil { continue } - goto outside + break } - - return time.Duration( - period, - ), fmt.Errorf( - "failed to find retention period in runtime config: %s", - runtimeData, - ) } } -outside: - // If both retention storage and retention period are set, // depending on whichever hit first, TSDB uses the data based // on that retention. - // So just becase retention period is, say 30d, we might not + // So just because retention period is, say 30d, we might not // have data spanning 30d if retention size cannot accommodate // 30 days of data. // @@ -227,6 +226,12 @@ outside: // the first one. // // Make query parameters + // When period is zero (during very first update) use a sufficiently + // long range of 10 years to get retention period + if period == 0 { + period = model.Duration(10 * 365 * 24 * time.Hour) + } + step := time.Duration(period) / 5000 urlValues := url.Values{ @@ -271,11 +276,19 @@ outside: if values, ok = val.([]interface{}); ok && len(values) > 0 { if v, ok := values[0].([]interface{}); ok && len(v) > 0 { if t, ok := v[0].(float64); ok { - // We are updating retention period only at updateInterval - // so we need to reduce the actual retention period. + // We are updating retention period only at a frequency set by + // updateInterval. This means there is no guarantee that the + // data until next update is present in the current TSDB. + // So we need to reduce the actual retention period by the update + // interval. // Here we reduce twice the update interval just to be // in a safe land - return (time.Since(time.Unix(int64(t), 0)) - 2*b.updateInterval).Truncate(time.Hour), nil + actualRetentionPeriod := (time.Since(time.Unix(int64(t), 0)) - 2*b.updateInterval).Truncate(time.Hour) + if actualRetentionPeriod < 0 { + actualRetentionPeriod = time.Since(time.Unix(int64(t), 0)) + } + + return actualRetentionPeriod, nil } } } diff --git a/pkg/lb/backend/tsdb_test.go b/pkg/lb/backend/tsdb_test.go index 49360ef..054b283 100644 --- a/pkg/lb/backend/tsdb_test.go +++ b/pkg/lb/backend/tsdb_test.go @@ -64,13 +64,13 @@ func TestTSDBConfigSuccess(t *testing.T) { url, _ := url.Parse(server.URL) b := NewTSDB(url, httputil.NewSingleHostReverseProxy(url), slog.New(slog.NewTextHandler(io.Discard, nil))) require.Equal(t, server.URL, b.URL().String()) - require.Equal(t, 354*time.Hour, b.RetentionPeriod()) + require.Equal(t, 720*time.Hour, b.RetentionPeriod()) require.True(t, b.IsAlive()) require.Equal(t, 0, b.ActiveConnections()) // Stop dummy server and query for retention period, we should get last updated value server.Close() - require.Equal(t, 354*time.Hour, b.RetentionPeriod()) + require.Equal(t, 720*time.Hour, b.RetentionPeriod()) } func TestTSDBConfigSuccessWithTwoRetentions(t *testing.T) { @@ -121,6 +121,54 @@ func TestTSDBConfigSuccessWithTwoRetentions(t *testing.T) { require.True(t, b.IsAlive()) } +func TestTSDBConfigSuccessWithRetentionSize(t *testing.T) { + // Start test server + expectedRuntime := tsdb.Response{ + Status: "success", + Data: map[string]string{ + "storageRetention": "10GiB", + }, + } + + expectedRange := tsdb.Response{ + Status: "success", + Data: map[string]interface{}{ + "resultType": "matrix", + "result": []interface{}{ + map[string]interface{}{ + "metric": map[string]string{ + "__name__": "up", + "instance": "localhost:9090", + }, + "values": []interface{}{ + []interface{}{time.Now().Add(-30 * 24 * time.Hour).Unix(), "1"}, + []interface{}{time.Now().Add(-30 * 23 * time.Hour).Unix(), "1"}, + }, + }, + }, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "runtimeinfo") { + if err := json.NewEncoder(w).Encode(&expectedRuntime); err != nil { + w.Write([]byte("KO")) + } + } else { + if err := json.NewEncoder(w).Encode(&expectedRange); err != nil { + w.Write([]byte("KO")) + } + } + })) + defer server.Close() + + url, _ := url.Parse(server.URL) + b := NewTSDB(url, httputil.NewSingleHostReverseProxy(url), slog.New(slog.NewTextHandler(io.Discard, nil))) + require.Equal(t, server.URL, b.URL().String()) + require.Equal(t, 714*time.Hour, b.RetentionPeriod()) + require.True(t, b.IsAlive()) +} + func TestTSDBConfigFail(t *testing.T) { // Start test server expected := "dummy" diff --git a/pkg/lb/frontend/frontend.go b/pkg/lb/frontend/frontend.go index f5f0ce0..1965015 100644 --- a/pkg/lb/frontend/frontend.go +++ b/pkg/lb/frontend/frontend.go @@ -6,16 +6,12 @@ package frontend import ( "context" - "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" - "net/url" - "os" - "path/filepath" "slices" "strings" "time" @@ -27,7 +23,6 @@ import ( "github.com/mahendrapaipuri/ceems/pkg/lb/base" "github.com/mahendrapaipuri/ceems/pkg/lb/serverpool" _ "github.com/mattn/go-sqlite3" - "github.com/prometheus/common/config" "github.com/prometheus/exporter-toolkit/web" ) @@ -81,69 +76,10 @@ type loadBalancer struct { // New returns a new instance of load balancer. func New(c *Config) (LoadBalancer, error) { - var db *sql.DB - - var ceemsClient *http.Client - - var ceemsWebURL *url.URL - - var err error - - // Check if DB path exists and get pointer to DB connection - if c.APIServer.Data.Path != "" { - dbAbsPath, err := filepath.Abs( - filepath.Join(c.APIServer.Data.Path, ceems_api_base.CEEMSDBName), - ) - if err != nil { - return nil, err - } - - // Set DB pointer only if file exists. Else sql.Open will create an empty - // file as if exists already - if _, err := os.Stat(dbAbsPath); err == nil { - dsn := fmt.Sprintf("file:%s?%s", dbAbsPath, "_mutex=no&mode=ro&_busy_timeout=5000") - - if db, err = sql.Open("sqlite3", dsn); err != nil { - return nil, err - } - } - } - - // Check if URL for CEEMS API exists - if c.APIServer.Web.URL == "" { - goto outside - } - - // Unwrap original error to avoid leaking sensitive passwords in output - ceemsWebURL, err = url.Parse(c.APIServer.Web.URL) + // Setup new auth middleware + amw, err := newAuthMiddleware(c) if err != nil { - return nil, errors.Unwrap(err) - } - - // Make a CEEMS API server client from client config - if ceemsClient, err = config.NewClientFromConfig(c.APIServer.Web.HTTPClientConfig, "ceems_api_server"); err != nil { - return nil, err - } - -outside: - // Setup middleware - amw := &authenticationMiddleware{ - logger: c.Logger, - ceems: ceems{ - db: db, - webURL: ceemsWebURL, - client: ceemsClient, - }, - } - - // Setup parsing functions based on LB type - switch c.LBType { - case base.PromLB: - amw.parseRequest = parseTSDBRequest - amw.pathsACLRegex = regexpTSDBRestrictedPath - case base.PyroLB: - amw.parseRequest = parsePyroRequest - amw.pathsACLRegex = regexpPyroRestrictedPath + return nil, fmt.Errorf("failed to setup auth middleware: %w", err) } return &loadBalancer{ diff --git a/pkg/lb/frontend/middleware.go b/pkg/lb/frontend/middleware.go index a21b60f..90d4e28 100644 --- a/pkg/lb/frontend/middleware.go +++ b/pkg/lb/frontend/middleware.go @@ -7,16 +7,22 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "log/slog" "net/http" "net/url" + "os" + "path/filepath" "regexp" "slices" "strconv" "strings" + ceems_api_base "github.com/mahendrapaipuri/ceems/pkg/api/base" ceems_api "github.com/mahendrapaipuri/ceems/pkg/api/http" + "github.com/mahendrapaipuri/ceems/pkg/lb/base" + "github.com/prometheus/common/config" ) // Headers. @@ -103,6 +109,73 @@ type authenticationMiddleware struct { parseRequest func(*ReqParams, *http.Request) error } +// newAuthMiddleware setups new auth middleware. +func newAuthMiddleware(c *Config) (*authenticationMiddleware, error) { + var db *sql.DB + + var ceemsClient *http.Client + + var ceemsWebURL *url.URL + + var err error + + // Check if DB path exists and get pointer to DB connection + if c.APIServer.Data.Path != "" { + dbAbsPath, err := filepath.Abs( + filepath.Join(c.APIServer.Data.Path, ceems_api_base.CEEMSDBName), + ) + if err != nil { + return nil, err + } + + // Set DB pointer only if file exists. Else sql.Open will create an empty + // file as if exists already + if _, err := os.Stat(dbAbsPath); err == nil { + dsn := fmt.Sprintf("file:%s?%s", dbAbsPath, "_mutex=no&mode=ro&_busy_timeout=5000") + + if db, err = sql.Open("sqlite3", dsn); err != nil { + return nil, err + } + } + } + + // Check if URL for CEEMS API exists + if c.APIServer.Web.URL != "" { + // Unwrap original error to avoid leaking sensitive passwords in output + ceemsWebURL, err = url.Parse(c.APIServer.Web.URL) + if err != nil { + return nil, errors.Unwrap(err) + } + + // Make a CEEMS API server client from client config + if ceemsClient, err = config.NewClientFromConfig(c.APIServer.Web.HTTPClientConfig, "ceems_api_server"); err != nil { + return nil, err + } + } + + // Setup middleware + amw := &authenticationMiddleware{ + logger: c.Logger, + ceems: ceems{ + db: db, + webURL: ceemsWebURL, + client: ceemsClient, + }, + } + + // Setup parsing functions based on LB type + switch c.LBType { + case base.PromLB: + amw.parseRequest = parseTSDBRequest + amw.pathsACLRegex = regexpTSDBRestrictedPath + case base.PyroLB: + amw.parseRequest = parsePyroRequest + amw.pathsACLRegex = regexpPyroRestrictedPath + } + + return amw, nil +} + // Check UUIDs in query belong to user or not. func (amw *authenticationMiddleware) isUserUnit( ctx context.Context,