Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB retention period estimation #270

Merged
merged 1 commit into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions pkg/lb/backend/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,30 +190,29 @@ func (b *tsdbServer) fetchRetentionPeriod() (time.Duration, error) {
return time.Duration(period), ErrTypeAssertion
}

// If storageRetention is set to duration ONLY, we can consider it as
// retention period
if period, err = model.ParseDuration(strings.TrimSpace(vString)); err == nil {
return time.Duration(period), nil
}

// If storageRetention is set to size or time and size, we need to get
// "actual" retention period
for _, retentionString := range strings.Split(vString, "or") {
period, err = model.ParseDuration(strings.TrimSpace(retentionString))
if err != nil {
continue
}

goto outside
break
}

return time.Duration(
period,
), fmt.Errorf(
"failed to find retention period in runtime config: %s",
runtimeData,
)
}
}

outside:

// If both retention storage and retention period are set,
// depending on whichever hit first, TSDB uses the data based
// on that retention.
// So just becase retention period is, say 30d, we might not
// So just because retention period is, say 30d, we might not
// have data spanning 30d if retention size cannot accommodate
// 30 days of data.
//
Expand All @@ -227,6 +226,12 @@ outside:
// the first one.
//
// Make query parameters
// When period is zero (during very first update) use a sufficiently
// long range of 10 years to get retention period
if period == 0 {
period = model.Duration(10 * 365 * 24 * time.Hour)
}

step := time.Duration(period) / 5000

urlValues := url.Values{
Expand Down Expand Up @@ -271,11 +276,19 @@ outside:
if values, ok = val.([]interface{}); ok && len(values) > 0 {
if v, ok := values[0].([]interface{}); ok && len(v) > 0 {
if t, ok := v[0].(float64); ok {
// We are updating retention period only at updateInterval
// so we need to reduce the actual retention period.
// We are updating retention period only at a frequency set by
// updateInterval. This means there is no guarantee that the
// data until next update is present in the current TSDB.
// So we need to reduce the actual retention period by the update
// interval.
// Here we reduce twice the update interval just to be
// in a safe land
return (time.Since(time.Unix(int64(t), 0)) - 2*b.updateInterval).Truncate(time.Hour), nil
actualRetentionPeriod := (time.Since(time.Unix(int64(t), 0)) - 2*b.updateInterval).Truncate(time.Hour)
if actualRetentionPeriod < 0 {
actualRetentionPeriod = time.Since(time.Unix(int64(t), 0))
}

return actualRetentionPeriod, nil
}
}
}
Expand Down
52 changes: 50 additions & 2 deletions pkg/lb/backend/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func TestTSDBConfigSuccess(t *testing.T) {
url, _ := url.Parse(server.URL)
b := NewTSDB(url, httputil.NewSingleHostReverseProxy(url), slog.New(slog.NewTextHandler(io.Discard, nil)))
require.Equal(t, server.URL, b.URL().String())
require.Equal(t, 354*time.Hour, b.RetentionPeriod())
require.Equal(t, 720*time.Hour, b.RetentionPeriod())
require.True(t, b.IsAlive())
require.Equal(t, 0, b.ActiveConnections())

// Stop dummy server and query for retention period, we should get last updated value
server.Close()
require.Equal(t, 354*time.Hour, b.RetentionPeriod())
require.Equal(t, 720*time.Hour, b.RetentionPeriod())
}

func TestTSDBConfigSuccessWithTwoRetentions(t *testing.T) {
Expand Down Expand Up @@ -121,6 +121,54 @@ func TestTSDBConfigSuccessWithTwoRetentions(t *testing.T) {
require.True(t, b.IsAlive())
}

func TestTSDBConfigSuccessWithRetentionSize(t *testing.T) {
// Start test server
expectedRuntime := tsdb.Response{
Status: "success",
Data: map[string]string{
"storageRetention": "10GiB",
},
}

expectedRange := tsdb.Response{
Status: "success",
Data: map[string]interface{}{
"resultType": "matrix",
"result": []interface{}{
map[string]interface{}{
"metric": map[string]string{
"__name__": "up",
"instance": "localhost:9090",
},
"values": []interface{}{
[]interface{}{time.Now().Add(-30 * 24 * time.Hour).Unix(), "1"},
[]interface{}{time.Now().Add(-30 * 23 * time.Hour).Unix(), "1"},
},
},
},
},
}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "runtimeinfo") {
if err := json.NewEncoder(w).Encode(&expectedRuntime); err != nil {
w.Write([]byte("KO"))
}
} else {
if err := json.NewEncoder(w).Encode(&expectedRange); err != nil {
w.Write([]byte("KO"))
}
}
}))
defer server.Close()

url, _ := url.Parse(server.URL)
b := NewTSDB(url, httputil.NewSingleHostReverseProxy(url), slog.New(slog.NewTextHandler(io.Discard, nil)))
require.Equal(t, server.URL, b.URL().String())
require.Equal(t, 714*time.Hour, b.RetentionPeriod())
require.True(t, b.IsAlive())
}

func TestTSDBConfigFail(t *testing.T) {
// Start test server
expected := "dummy"
Expand Down
70 changes: 3 additions & 67 deletions pkg/lb/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ package frontend

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"path/filepath"
"slices"
"strings"
"time"
Expand All @@ -27,7 +23,6 @@ import (
"github.com/mahendrapaipuri/ceems/pkg/lb/base"
"github.com/mahendrapaipuri/ceems/pkg/lb/serverpool"
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/common/config"
"github.com/prometheus/exporter-toolkit/web"
)

Expand Down Expand Up @@ -81,69 +76,10 @@ type loadBalancer struct {

// New returns a new instance of load balancer.
func New(c *Config) (LoadBalancer, error) {
var db *sql.DB

var ceemsClient *http.Client

var ceemsWebURL *url.URL

var err error

// Check if DB path exists and get pointer to DB connection
if c.APIServer.Data.Path != "" {
dbAbsPath, err := filepath.Abs(
filepath.Join(c.APIServer.Data.Path, ceems_api_base.CEEMSDBName),
)
if err != nil {
return nil, err
}

// Set DB pointer only if file exists. Else sql.Open will create an empty
// file as if exists already
if _, err := os.Stat(dbAbsPath); err == nil {
dsn := fmt.Sprintf("file:%s?%s", dbAbsPath, "_mutex=no&mode=ro&_busy_timeout=5000")

if db, err = sql.Open("sqlite3", dsn); err != nil {
return nil, err
}
}
}

// Check if URL for CEEMS API exists
if c.APIServer.Web.URL == "" {
goto outside
}

// Unwrap original error to avoid leaking sensitive passwords in output
ceemsWebURL, err = url.Parse(c.APIServer.Web.URL)
// Setup new auth middleware
amw, err := newAuthMiddleware(c)
if err != nil {
return nil, errors.Unwrap(err)
}

// Make a CEEMS API server client from client config
if ceemsClient, err = config.NewClientFromConfig(c.APIServer.Web.HTTPClientConfig, "ceems_api_server"); err != nil {
return nil, err
}

outside:
// Setup middleware
amw := &authenticationMiddleware{
logger: c.Logger,
ceems: ceems{
db: db,
webURL: ceemsWebURL,
client: ceemsClient,
},
}

// Setup parsing functions based on LB type
switch c.LBType {
case base.PromLB:
amw.parseRequest = parseTSDBRequest
amw.pathsACLRegex = regexpTSDBRestrictedPath
case base.PyroLB:
amw.parseRequest = parsePyroRequest
amw.pathsACLRegex = regexpPyroRestrictedPath
return nil, fmt.Errorf("failed to setup auth middleware: %w", err)
}

return &loadBalancer{
Expand Down
73 changes: 73 additions & 0 deletions pkg/lb/frontend/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"

ceems_api_base "github.com/mahendrapaipuri/ceems/pkg/api/base"
ceems_api "github.com/mahendrapaipuri/ceems/pkg/api/http"
"github.com/mahendrapaipuri/ceems/pkg/lb/base"
"github.com/prometheus/common/config"
)

// Headers.
Expand Down Expand Up @@ -103,6 +109,73 @@ type authenticationMiddleware struct {
parseRequest func(*ReqParams, *http.Request) error
}

// newAuthMiddleware setups new auth middleware.
func newAuthMiddleware(c *Config) (*authenticationMiddleware, error) {
var db *sql.DB

var ceemsClient *http.Client

var ceemsWebURL *url.URL

var err error

// Check if DB path exists and get pointer to DB connection
if c.APIServer.Data.Path != "" {
dbAbsPath, err := filepath.Abs(
filepath.Join(c.APIServer.Data.Path, ceems_api_base.CEEMSDBName),
)
if err != nil {
return nil, err
}

// Set DB pointer only if file exists. Else sql.Open will create an empty
// file as if exists already
if _, err := os.Stat(dbAbsPath); err == nil {
dsn := fmt.Sprintf("file:%s?%s", dbAbsPath, "_mutex=no&mode=ro&_busy_timeout=5000")

if db, err = sql.Open("sqlite3", dsn); err != nil {
return nil, err
}
}
}

// Check if URL for CEEMS API exists
if c.APIServer.Web.URL != "" {
// Unwrap original error to avoid leaking sensitive passwords in output
ceemsWebURL, err = url.Parse(c.APIServer.Web.URL)
if err != nil {
return nil, errors.Unwrap(err)
}

// Make a CEEMS API server client from client config
if ceemsClient, err = config.NewClientFromConfig(c.APIServer.Web.HTTPClientConfig, "ceems_api_server"); err != nil {
return nil, err
}
}

// Setup middleware
amw := &authenticationMiddleware{
logger: c.Logger,
ceems: ceems{
db: db,
webURL: ceemsWebURL,
client: ceemsClient,
},
}

// Setup parsing functions based on LB type
switch c.LBType {
case base.PromLB:
amw.parseRequest = parseTSDBRequest
amw.pathsACLRegex = regexpTSDBRestrictedPath
case base.PyroLB:
amw.parseRequest = parsePyroRequest
amw.pathsACLRegex = regexpPyroRestrictedPath
}

return amw, nil
}

// Check UUIDs in query belong to user or not.
func (amw *authenticationMiddleware) isUserUnit(
ctx context.Context,
Expand Down
Loading