diff --git a/pkg/lb/backend/tsdb.go b/pkg/lb/backend/tsdb.go index e274e8a..4f9adcd 100644 --- a/pkg/lb/backend/tsdb.go +++ b/pkg/lb/backend/tsdb.go @@ -190,30 +190,29 @@ func (b *tsdbServer) fetchRetentionPeriod() (time.Duration, error) { return time.Duration(period), ErrTypeAssertion } + // If storageRetention is set to duration ONLY, we can consider it as + // retention period + if period, err = model.ParseDuration(strings.TrimSpace(vString)); err == nil { + return time.Duration(period), nil + } + + // If storageRetention is set to size or time and size, we need to get + // "actual" retention period for _, retentionString := range strings.Split(vString, "or") { period, err = model.ParseDuration(strings.TrimSpace(retentionString)) if err != nil { continue } - goto outside + break } - - return time.Duration( - period, - ), fmt.Errorf( - "failed to find retention period in runtime config: %s", - runtimeData, - ) } } -outside: - // If both retention storage and retention period are set, // depending on whichever hit first, TSDB uses the data based // on that retention. - // So just becase retention period is, say 30d, we might not + // So just because retention period is, say 30d, we might not // have data spanning 30d if retention size cannot accommodate // 30 days of data. // @@ -227,6 +226,12 @@ outside: // the first one. // // Make query parameters + // When period is zero (during very first update) use a sufficiently + // long range of 10 years to get retention period + if period == 0 { + period = model.Duration(10 * 365 * 24 * time.Hour) + } + step := time.Duration(period) / 5000 urlValues := url.Values{ @@ -271,11 +276,19 @@ outside: if values, ok = val.([]interface{}); ok && len(values) > 0 { if v, ok := values[0].([]interface{}); ok && len(v) > 0 { if t, ok := v[0].(float64); ok { - // We are updating retention period only at updateInterval - // so we need to reduce the actual retention period. + // We are updating retention period only at a frequency set by + // updateInterval. This means there is no guarantee that the + // data until next update is present in the current TSDB. + // So we need to reduce the actual retention period by the update + // interval. // Here we reduce twice the update interval just to be // in a safe land - return (time.Since(time.Unix(int64(t), 0)) - 2*b.updateInterval).Truncate(time.Hour), nil + actualRetentionPeriod := (time.Since(time.Unix(int64(t), 0)) - 2*b.updateInterval).Truncate(time.Hour) + if actualRetentionPeriod < 0 { + actualRetentionPeriod = time.Since(time.Unix(int64(t), 0)) + } + + return actualRetentionPeriod, nil } } } diff --git a/pkg/lb/backend/tsdb_test.go b/pkg/lb/backend/tsdb_test.go index 49360ef..054b283 100644 --- a/pkg/lb/backend/tsdb_test.go +++ b/pkg/lb/backend/tsdb_test.go @@ -64,13 +64,13 @@ func TestTSDBConfigSuccess(t *testing.T) { url, _ := url.Parse(server.URL) b := NewTSDB(url, httputil.NewSingleHostReverseProxy(url), slog.New(slog.NewTextHandler(io.Discard, nil))) require.Equal(t, server.URL, b.URL().String()) - require.Equal(t, 354*time.Hour, b.RetentionPeriod()) + require.Equal(t, 720*time.Hour, b.RetentionPeriod()) require.True(t, b.IsAlive()) require.Equal(t, 0, b.ActiveConnections()) // Stop dummy server and query for retention period, we should get last updated value server.Close() - require.Equal(t, 354*time.Hour, b.RetentionPeriod()) + require.Equal(t, 720*time.Hour, b.RetentionPeriod()) } func TestTSDBConfigSuccessWithTwoRetentions(t *testing.T) { @@ -121,6 +121,54 @@ func TestTSDBConfigSuccessWithTwoRetentions(t *testing.T) { require.True(t, b.IsAlive()) } +func TestTSDBConfigSuccessWithRetentionSize(t *testing.T) { + // Start test server + expectedRuntime := tsdb.Response{ + Status: "success", + Data: map[string]string{ + "storageRetention": "10GiB", + }, + } + + expectedRange := tsdb.Response{ + Status: "success", + Data: map[string]interface{}{ + "resultType": "matrix", + "result": []interface{}{ + map[string]interface{}{ + "metric": map[string]string{ + "__name__": "up", + "instance": "localhost:9090", + }, + "values": []interface{}{ + []interface{}{time.Now().Add(-30 * 24 * time.Hour).Unix(), "1"}, + []interface{}{time.Now().Add(-30 * 23 * time.Hour).Unix(), "1"}, + }, + }, + }, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "runtimeinfo") { + if err := json.NewEncoder(w).Encode(&expectedRuntime); err != nil { + w.Write([]byte("KO")) + } + } else { + if err := json.NewEncoder(w).Encode(&expectedRange); err != nil { + w.Write([]byte("KO")) + } + } + })) + defer server.Close() + + url, _ := url.Parse(server.URL) + b := NewTSDB(url, httputil.NewSingleHostReverseProxy(url), slog.New(slog.NewTextHandler(io.Discard, nil))) + require.Equal(t, server.URL, b.URL().String()) + require.Equal(t, 714*time.Hour, b.RetentionPeriod()) + require.True(t, b.IsAlive()) +} + func TestTSDBConfigFail(t *testing.T) { // Start test server expected := "dummy" diff --git a/pkg/lb/frontend/frontend.go b/pkg/lb/frontend/frontend.go index f5f0ce0..1965015 100644 --- a/pkg/lb/frontend/frontend.go +++ b/pkg/lb/frontend/frontend.go @@ -6,16 +6,12 @@ package frontend import ( "context" - "database/sql" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" - "net/url" - "os" - "path/filepath" "slices" "strings" "time" @@ -27,7 +23,6 @@ import ( "github.com/mahendrapaipuri/ceems/pkg/lb/base" "github.com/mahendrapaipuri/ceems/pkg/lb/serverpool" _ "github.com/mattn/go-sqlite3" - "github.com/prometheus/common/config" "github.com/prometheus/exporter-toolkit/web" ) @@ -81,69 +76,10 @@ type loadBalancer struct { // New returns a new instance of load balancer. func New(c *Config) (LoadBalancer, error) { - var db *sql.DB - - var ceemsClient *http.Client - - var ceemsWebURL *url.URL - - var err error - - // Check if DB path exists and get pointer to DB connection - if c.APIServer.Data.Path != "" { - dbAbsPath, err := filepath.Abs( - filepath.Join(c.APIServer.Data.Path, ceems_api_base.CEEMSDBName), - ) - if err != nil { - return nil, err - } - - // Set DB pointer only if file exists. Else sql.Open will create an empty - // file as if exists already - if _, err := os.Stat(dbAbsPath); err == nil { - dsn := fmt.Sprintf("file:%s?%s", dbAbsPath, "_mutex=no&mode=ro&_busy_timeout=5000") - - if db, err = sql.Open("sqlite3", dsn); err != nil { - return nil, err - } - } - } - - // Check if URL for CEEMS API exists - if c.APIServer.Web.URL == "" { - goto outside - } - - // Unwrap original error to avoid leaking sensitive passwords in output - ceemsWebURL, err = url.Parse(c.APIServer.Web.URL) + // Setup new auth middleware + amw, err := newAuthMiddleware(c) if err != nil { - return nil, errors.Unwrap(err) - } - - // Make a CEEMS API server client from client config - if ceemsClient, err = config.NewClientFromConfig(c.APIServer.Web.HTTPClientConfig, "ceems_api_server"); err != nil { - return nil, err - } - -outside: - // Setup middleware - amw := &authenticationMiddleware{ - logger: c.Logger, - ceems: ceems{ - db: db, - webURL: ceemsWebURL, - client: ceemsClient, - }, - } - - // Setup parsing functions based on LB type - switch c.LBType { - case base.PromLB: - amw.parseRequest = parseTSDBRequest - amw.pathsACLRegex = regexpTSDBRestrictedPath - case base.PyroLB: - amw.parseRequest = parsePyroRequest - amw.pathsACLRegex = regexpPyroRestrictedPath + return nil, fmt.Errorf("failed to setup auth middleware: %w", err) } return &loadBalancer{ diff --git a/pkg/lb/frontend/middleware.go b/pkg/lb/frontend/middleware.go index a21b60f..90d4e28 100644 --- a/pkg/lb/frontend/middleware.go +++ b/pkg/lb/frontend/middleware.go @@ -7,16 +7,22 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "log/slog" "net/http" "net/url" + "os" + "path/filepath" "regexp" "slices" "strconv" "strings" + ceems_api_base "github.com/mahendrapaipuri/ceems/pkg/api/base" ceems_api "github.com/mahendrapaipuri/ceems/pkg/api/http" + "github.com/mahendrapaipuri/ceems/pkg/lb/base" + "github.com/prometheus/common/config" ) // Headers. @@ -103,6 +109,73 @@ type authenticationMiddleware struct { parseRequest func(*ReqParams, *http.Request) error } +// newAuthMiddleware setups new auth middleware. +func newAuthMiddleware(c *Config) (*authenticationMiddleware, error) { + var db *sql.DB + + var ceemsClient *http.Client + + var ceemsWebURL *url.URL + + var err error + + // Check if DB path exists and get pointer to DB connection + if c.APIServer.Data.Path != "" { + dbAbsPath, err := filepath.Abs( + filepath.Join(c.APIServer.Data.Path, ceems_api_base.CEEMSDBName), + ) + if err != nil { + return nil, err + } + + // Set DB pointer only if file exists. Else sql.Open will create an empty + // file as if exists already + if _, err := os.Stat(dbAbsPath); err == nil { + dsn := fmt.Sprintf("file:%s?%s", dbAbsPath, "_mutex=no&mode=ro&_busy_timeout=5000") + + if db, err = sql.Open("sqlite3", dsn); err != nil { + return nil, err + } + } + } + + // Check if URL for CEEMS API exists + if c.APIServer.Web.URL != "" { + // Unwrap original error to avoid leaking sensitive passwords in output + ceemsWebURL, err = url.Parse(c.APIServer.Web.URL) + if err != nil { + return nil, errors.Unwrap(err) + } + + // Make a CEEMS API server client from client config + if ceemsClient, err = config.NewClientFromConfig(c.APIServer.Web.HTTPClientConfig, "ceems_api_server"); err != nil { + return nil, err + } + } + + // Setup middleware + amw := &authenticationMiddleware{ + logger: c.Logger, + ceems: ceems{ + db: db, + webURL: ceemsWebURL, + client: ceemsClient, + }, + } + + // Setup parsing functions based on LB type + switch c.LBType { + case base.PromLB: + amw.parseRequest = parseTSDBRequest + amw.pathsACLRegex = regexpTSDBRestrictedPath + case base.PyroLB: + amw.parseRequest = parsePyroRequest + amw.pathsACLRegex = regexpPyroRestrictedPath + } + + return amw, nil +} + // Check UUIDs in query belong to user or not. func (amw *authenticationMiddleware) isUserUnit( ctx context.Context,