diff --git a/CHANGELOG.md b/CHANGELOG.md index ba6311d6cf..1dac1b018a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 * [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370 * [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355 * [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333 diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 95f90d6ed2..3a58e3539a 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -344,6 +344,23 @@ store_gateway: # tenant(s) for processing will ignore them instead. # CLI flag: -store-gateway.disabled-tenants [disabled_tenants: | default = ""] + + hedged_request: + # If true, hedged requests are applied to object store calls. It can help + # with reducing tail latency. + # CLI flag: -store-gateway.hedged-request.enabled + [enabled: | default = false] + + # Maximum number of hedged requests allowed for each initial request. A high + # number can reduce latency but increase internal calls. + # CLI flag: -store-gateway.hedged-request.max-requests + [max_requests: | default = 3] + + # It is used to calculate a latency threshold to trigger hedged requests. + # For example, additional requests are triggered when the initial request + # response time exceeds the 90th percentile. + # CLI flag: -store-gateway.hedged-request.quantile + [quantile: | default = 0.9] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8e4285ad3b..f610a24a72 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5664,6 +5664,23 @@ sharding_ring: # tenant(s) for processing will ignore them instead. # CLI flag: -store-gateway.disabled-tenants [disabled_tenants: | default = ""] + +hedged_request: + # If true, hedged requests are applied to object store calls. It can help with + # reducing tail latency. + # CLI flag: -store-gateway.hedged-request.enabled + [enabled: | default = false] + + # Maximum number of hedged requests allowed for each initial request. A high + # number can reduce latency but increase internal calls. + # CLI flag: -store-gateway.hedged-request.max-requests + [max_requests: | default = 3] + + # It is used to calculate a latency threshold to trigger hedged requests. For + # example, additional requests are triggered when the initial request response + # time exceeds the 90th percentile. + # CLI flag: -store-gateway.hedged-request.quantile + [quantile: | default = 0.9] ``` ### `tracing_config` diff --git a/go.mod b/go.mod index cf458baea0..f4f6c3cfcb 100644 --- a/go.mod +++ b/go.mod @@ -116,9 +116,11 @@ require ( github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect + github.com/caio/go-tdigest v3.1.0+incompatible // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/cristalhq/hedgedhttp v0.9.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dennwc/varint v1.0.0 // indirect github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect diff --git a/go.sum b/go.sum index d525f73237..a43497bd2c 100644 --- a/go.sum +++ b/go.sum @@ -897,6 +897,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= +github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds= +github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -942,6 +944,8 @@ github.com/cortexproject/promqlsmith v0.0.0-20241121054008-8b48fe2471ef/go.mod h github.com/cortexproject/weaveworks-common v0.0.0-20241129212437-96019edf21f1 h1:UoSixdl0sBUhfEOMpIGxFnJjp3/y/+nkw6Du7su05FE= github.com/cortexproject/weaveworks-common v0.0.0-20241129212437-96019edf21f1/go.mod h1:7cl8fS/nivXe2DmBUUmr/3UGTJG2jVU2NRaIayR2Zjs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cristalhq/hedgedhttp v0.9.1 h1:g68L9cf8uUyQKQJwciD0A1Vgbsz+QgCjuB1I8FAsCDs= +github.com/cristalhq/hedgedhttp v0.9.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -1390,6 +1394,8 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= +github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4= +github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/integration/e2ecortex/storage.go b/integration/e2ecortex/storage.go index 6163a51025..7150154921 100644 --- a/integration/e2ecortex/storage.go +++ b/integration/e2ecortex/storage.go @@ -21,7 +21,7 @@ type S3Client struct { } func NewS3Client(cfg s3.Config) (*S3Client, error) { - writer, err := s3.NewBucketClient(cfg, "test", log.NewNopLogger()) + writer, err := s3.NewBucketClient(cfg, nil, "test", log.NewNopLogger()) if err != nil { return nil, err } diff --git a/pkg/alertmanager/alertstore/store.go b/pkg/alertmanager/alertstore/store.go index 11aeecf596..9b02d11cd1 100644 --- a/pkg/alertmanager/alertstore/store.go +++ b/pkg/alertmanager/alertstore/store.go @@ -62,7 +62,7 @@ func NewAlertStore(ctx context.Context, cfg Config, cfgProvider bucket.TenantCon return local.NewStore(cfg.Local) } - bucketClient, err := bucket.NewClient(ctx, cfg.Config, "alertmanager-storage", logger, reg) + bucketClient, err := bucket.NewClient(ctx, cfg.Config, nil, "alertmanager-storage", logger, reg) if err != nil { return nil, err } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 5345bd9076..a0a7e65e68 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -384,7 +384,7 @@ type Compactor struct { // NewCompactor makes a new Compactor. func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) { bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { - return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) + return bucket.NewClient(ctx, storageCfg.Bucket, nil, "compactor", logger, registerer) } blocksGrouperFactory := compactorCfg.BlocksGrouperFactory diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index e8d71321dc..8823922a3c 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -177,7 +177,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { } } } - return bucket.NewClient(ctx, t.Cfg.RuntimeConfig.StorageConfig, "runtime-config", logger, registerer) + return bucket.NewClient(ctx, t.Cfg.RuntimeConfig.StorageConfig, nil, "runtime-config", logger, registerer) } serv, err := runtimeconfig.New(t.Cfg.RuntimeConfig, registerer, logger, bucketClientFactory) if err == nil { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e6fb3b9838..a16e71b269 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -693,7 +693,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe cfg.ingesterClientFactory = client.MakeIngesterClient } - bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", logger, registerer) + bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, nil, "ingester", logger, registerer) if err != nil { return nil, errors.Wrap(err, "failed to create the bucket client") } @@ -769,7 +769,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe // this is a special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react // on Flush method and flush all opened TSDBs when called. func NewForFlusher(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { - bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", logger, registerer) + bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, nil, "ingester", logger, registerer) if err != nil { return nil, errors.Wrap(err, "failed to create the bucket client") } diff --git a/pkg/purger/tenant_deletion_api.go b/pkg/purger/tenant_deletion_api.go index 1b8310f826..5ac6169c62 100644 --- a/pkg/purger/tenant_deletion_api.go +++ b/pkg/purger/tenant_deletion_api.go @@ -119,7 +119,7 @@ func (api *TenantDeletionAPI) isBlocksForUserDeleted(ctx context.Context, userID } func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { - bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg) + bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, nil, "purger", logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client") } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index ffad6c5085..c61132c069 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -185,7 +185,7 @@ func NewBlocksStoreQueryable( func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { var stores BlocksStoreSet - bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, "querier", logger, reg) + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, gatewayCfg.HedgedRequest.GetHedgedRoundTripper(), "querier", logger, reg) if err != nil { return nil, errors.Wrap(err, "failed to create bucket client") } diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index 1ebb562d60..0050778aa5 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -31,7 +31,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. return local.NewLocalRulesClient(cfg.Local, loader) } - bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, reg) + bucketClient, err := bucket.NewClient(ctx, cfg.Config, nil, "ruler-storage", logger, reg) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/azure/bucket_client.go b/pkg/storage/bucket/azure/bucket_client.go index 5557e418be..60dd017b04 100644 --- a/pkg/storage/bucket/azure/bucket_client.go +++ b/pkg/storage/bucket/azure/bucket_client.go @@ -1,6 +1,8 @@ package azure import ( + "net/http" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" @@ -9,7 +11,7 @@ import ( yaml "gopkg.in/yaml.v2" ) -func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger) (objstore.Bucket, error) { bucketConfig := azure.Config{ StorageAccountName: cfg.StorageAccountName, StorageAccountKey: cfg.StorageAccountKey.Value, @@ -37,5 +39,5 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return nil, err } - return azure.NewBucket(logger, serialized, name, nil) + return azure.NewBucket(logger, serialized, name, hedgedRoundTripper) } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 7d120cd6d7..b7dc57f9f2 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "net/http" "strings" "github.com/go-kit/log" @@ -103,17 +104,17 @@ func (cfg *Config) Validate() error { } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) { +func NewClient(ctx context.Context, cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) { var client objstore.Bucket switch cfg.Backend { case S3: - client, err = s3.NewBucketClient(cfg.S3, name, logger) + client, err = s3.NewBucketClient(cfg.S3, hedgedRoundTripper, name, logger) case GCS: - client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) + client, err = gcs.NewBucketClient(ctx, cfg.GCS, hedgedRoundTripper, name, logger) case Azure: - client, err = azure.NewBucketClient(cfg.Azure, name, logger) + client, err = azure.NewBucketClient(cfg.Azure, hedgedRoundTripper, name, logger) case Swift: - client, err = swift.NewBucketClient(cfg.Swift, name, logger) + client, err = swift.NewBucketClient(cfg.Swift, hedgedRoundTripper, name, logger) case Filesystem: client, err = filesystem.NewBucketClient(cfg.Filesystem) default: diff --git a/pkg/storage/bucket/client_test.go b/pkg/storage/bucket/client_test.go index 938d6ddcc2..78b2ea3db2 100644 --- a/pkg/storage/bucket/client_test.go +++ b/pkg/storage/bucket/client_test.go @@ -87,7 +87,7 @@ func TestNewClient(t *testing.T) { require.NoError(t, err) // Instance a new bucket client from the config - bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger, nil) + bucketClient, err := NewClient(context.Background(), cfg, nil, "test", util_log.Logger, nil) require.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/storage/bucket/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go index fb4450a930..1ef0202804 100644 --- a/pkg/storage/bucket/gcs/bucket_client.go +++ b/pkg/storage/bucket/gcs/bucket_client.go @@ -2,6 +2,7 @@ package gcs import ( "context" + "net/http" "github.com/go-kit/log" "github.com/thanos-io/objstore" @@ -10,7 +11,7 @@ import ( ) // NewBucketClient creates a new GCS bucket client -func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { +func NewBucketClient(ctx context.Context, cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger) (objstore.Bucket, error) { bucketConfig := gcs.Config{ Bucket: cfg.BucketName, ServiceAccount: cfg.ServiceAccount.Value, @@ -23,5 +24,5 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo return nil, err } - return gcs.NewBucket(ctx, logger, serialized, name, nil) + return gcs.NewBucket(ctx, logger, serialized, name, hedgedRoundTripper) } diff --git a/pkg/storage/bucket/hedged_request.go b/pkg/storage/bucket/hedged_request.go new file mode 100644 index 0000000000..e0e3ba77da --- /dev/null +++ b/pkg/storage/bucket/hedged_request.go @@ -0,0 +1,43 @@ +package bucket + +import ( + "errors" + "flag" + "net/http" + + "github.com/thanos-io/thanos/pkg/exthttp" +) + +var ( + errInvalidQuantile = errors.New("invalid hedged request quantile, it must be between 0 and 1") +) + +type HedgedRequestConfig struct { + Enabled bool `yaml:"enabled"` + MaxRequests uint `yaml:"max_requests"` + Quantile float64 `yaml:"quantile"` +} + +func (cfg *HedgedRequestConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.BoolVar(&cfg.Enabled, prefix+"hedged-request.enabled", false, "If true, hedged requests are applied to object store calls. It can help with reducing tail latency.") + f.UintVar(&cfg.MaxRequests, prefix+"hedged-request.max-requests", 3, "Maximum number of hedged requests allowed for each initial request. A high number can reduce latency but increase internal calls.") + f.Float64Var(&cfg.Quantile, prefix+"hedged-request.quantile", 0.9, "It is used to calculate a latency threshold to trigger hedged requests. For example, additional requests are triggered when the initial request response time exceeds the 90th percentile.") +} + +func (cfg *HedgedRequestConfig) GetHedgedRoundTripper() func(rt http.RoundTripper) http.RoundTripper { + return exthttp.CreateHedgedTransportWithConfig(exthttp.CustomBucketConfig{ + HedgingConfig: exthttp.HedgingConfig{ + Enabled: cfg.Enabled, + UpTo: cfg.MaxRequests, + Quantile: cfg.Quantile, + }, + }) +} + +func (cfg *HedgedRequestConfig) Validate() error { + if cfg.Quantile > 1 || cfg.Quantile < 0 { + return errInvalidQuantile + } + + return nil +} diff --git a/pkg/storage/bucket/hedged_request_test.go b/pkg/storage/bucket/hedged_request_test.go new file mode 100644 index 0000000000..7cc84e4d10 --- /dev/null +++ b/pkg/storage/bucket/hedged_request_test.go @@ -0,0 +1,36 @@ +package bucket + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHedgedRequest_Validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { + cfg *HedgedRequestConfig + expected error + }{ + "should fail if hedged request quantile is less than 0": { + cfg: &HedgedRequestConfig{ + Quantile: -0.1, + }, + expected: errInvalidQuantile, + }, + "should fail if hedged request quantile is more than 1": { + cfg: &HedgedRequestConfig{ + Quantile: 1.1, + }, + expected: errInvalidQuantile, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + err := testData.cfg.Validate() + require.Equal(t, testData.expected, err) + }) + } + +} diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index e83f5a1f79..53a0f4f588 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "net/http" "time" "github.com/go-kit/log" @@ -21,13 +22,13 @@ var defaultRetryMinBackoff = 5 * time.Second var defaultRetryMaxBackoff = 1 * time.Minute // NewBucketClient creates a new S3 bucket client -func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger) (objstore.Bucket, error) { s3Cfg, err := newS3Config(cfg) if err != nil { return nil, err } - bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name, nil) + bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name, hedgedRoundTripper) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/sse_bucket_client_test.go b/pkg/storage/bucket/sse_bucket_client_test.go index 1b2815982c..1bdb102dc0 100644 --- a/pkg/storage/bucket/sse_bucket_client_test.go +++ b/pkg/storage/bucket/sse_bucket_client_test.go @@ -57,7 +57,7 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) { BucketLookupType: s3.BucketPathLookup, } - s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger()) + s3Client, err := s3.NewBucketClient(s3Cfg, nil, "test", log.NewNopLogger()) require.NoError(t, err) // Configure the config provider with NO KMS key ID. diff --git a/pkg/storage/bucket/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go index 38c266f44c..2e4f8e818a 100644 --- a/pkg/storage/bucket/swift/bucket_client.go +++ b/pkg/storage/bucket/swift/bucket_client.go @@ -1,6 +1,8 @@ package swift import ( + "net/http" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" @@ -9,7 +11,7 @@ import ( ) // NewBucketClient creates a new Swift bucket client -func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, _ string, logger log.Logger) (objstore.Bucket, error) { bucketConfig := swift.Config{ AuthVersion: cfg.AuthVersion, AuthUrl: cfg.AuthURL, @@ -45,5 +47,5 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, return nil, err } - return swift.NewContainer(logger, serialized, nil) + return swift.NewContainer(logger, serialized, hedgedRoundTripper) } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 9af80e31ef..c043adee18 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net/http" "strings" "time" @@ -62,6 +63,9 @@ type Config struct { EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + + // Hedged Request + HedgedRequest bucket.HedgedRequestConfig `yaml:"hedged_request"` } // RegisterFlags registers the Config flags. @@ -72,6 +76,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.") f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.") + cfg.HedgedRequest.RegisterFlagsWithPrefix(f, "store-gateway.") } // Validate the Config. @@ -86,6 +91,10 @@ func (cfg *Config) Validate(limits validation.Limits) error { } } + if err := cfg.HedgedRequest.Validate(); err != nil { + return err + } + return nil } @@ -114,7 +123,7 @@ type StoreGateway struct { func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { var ringStore kv.Client - bucketClient, err := createBucketClient(storageCfg, logger, reg) + bucketClient, err := createBucketClient(storageCfg, gatewayCfg.HedgedRequest.GetHedgedRoundTripper(), logger, reg) if err != nil { return nil, err } @@ -407,8 +416,8 @@ func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler) func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { } -func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { - bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg) +func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, hedgedRoundTripper, "store-gateway", logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client") } diff --git a/tools/thanosconvert/thanosconvert.go b/tools/thanosconvert/thanosconvert.go index f4b91b4009..840c4eadac 100644 --- a/tools/thanosconvert/thanosconvert.go +++ b/tools/thanosconvert/thanosconvert.go @@ -44,7 +44,7 @@ type Results map[string]PerUserResults // NewThanosBlockConverter creates a ThanosBlockConverter func NewThanosBlockConverter(ctx context.Context, cfg bucket.Config, dryRun bool, logger log.Logger) (*ThanosBlockConverter, error) { - bkt, err := bucket.NewClient(ctx, cfg, "thanosconvert", logger, nil) + bkt, err := bucket.NewClient(ctx, cfg, nil, "thanosconvert", logger, nil) if err != nil { return nil, err } diff --git a/vendor/github.com/caio/go-tdigest/.gitignore b/vendor/github.com/caio/go-tdigest/.gitignore new file mode 100644 index 0000000000..f9f915f62d --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/.gitignore @@ -0,0 +1,2 @@ +vendor/ +go-tdigest.test diff --git a/vendor/github.com/caio/go-tdigest/CONTRIBUTING.md b/vendor/github.com/caio/go-tdigest/CONTRIBUTING.md new file mode 100644 index 0000000000..3baa1d1645 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/CONTRIBUTING.md @@ -0,0 +1,42 @@ +# Contributing + +First and foremost: **thank you very much** for your interest in this +project. Feel free to skip all this and open your issue / pull request +if reading contribution guidelines is too much for you at this point. +We value your contribution a lot more than we value your ability to +follow rules (and thankfully we can afford to take this approach given +this project's demand). + +Any kind of contribution is welcome. We can always use better docs and +tests (and code, of course). If you think you can improve this project +in any dimension _let's talk_ :-) + +## Guidelines + +Be kind and respectful in all your interactions with people inside +(outside too!) this community; There is no excuse for not showing +basic decency. Sarcasm and generally unconstructive remarks are **not +welcome**. + +### Issues + +When opening and interacting with issues please: + +- Be as clear as possible +- Provide examples if you can + +### Pull Requests + +We expect that pull requests: + +- Have [good commit messages][commits] +- Contain tests for new features +- Target and can be cleanly merged with the `master` branch +- Pass the tests + +[commits]: https://www.git-scm.com/book/en/v2/Distributed-Git-Contributing-to-a-Project#_commit_guidelines + +### Project Management + +Don't bother with labels, milestones, assignments, etc. We don't make +use of those. diff --git a/vendor/github.com/caio/go-tdigest/Gopkg.lock b/vendor/github.com/caio/go-tdigest/Gopkg.lock new file mode 100644 index 0000000000..65bf9067a5 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/Gopkg.lock @@ -0,0 +1,41 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + digest = "1:cf63454c1e81409484ded047413228de0f7a3031f0fcd36d4e1db7620c3c7d1b" + name = "github.com/leesper/go_rng" + packages = ["."] + pruneopts = "" + revision = "5344a9259b21627d94279721ab1f27eb029194e7" + +[[projects]] + branch = "master" + digest = "1:ad6d9b2cce40c7c44952d49a6a324a2110db43b4279d9e599db74e45de5ae80c" + name = "gonum.org/v1/gonum" + packages = [ + "blas", + "blas/blas64", + "blas/gonum", + "floats", + "internal/asm/c128", + "internal/asm/f32", + "internal/asm/f64", + "internal/math32", + "lapack", + "lapack/gonum", + "lapack/lapack64", + "mat", + "stat", + ] + pruneopts = "" + revision = "f0982070f509ee139841ca385c44dc22a77c8da8" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + input-imports = [ + "github.com/leesper/go_rng", + "gonum.org/v1/gonum/stat", + ] + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/vendor/github.com/caio/go-tdigest/Gopkg.toml b/vendor/github.com/caio/go-tdigest/Gopkg.toml new file mode 100644 index 0000000000..323002ca8a --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/Gopkg.toml @@ -0,0 +1,21 @@ + +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" diff --git a/vendor/github.com/caio/go-tdigest/LICENSE b/vendor/github.com/caio/go-tdigest/LICENSE new file mode 100644 index 0000000000..f5f074401b --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Caio Romão Costa Nascimento + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/caio/go-tdigest/README.md b/vendor/github.com/caio/go-tdigest/README.md new file mode 100644 index 0000000000..b635870254 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/README.md @@ -0,0 +1,94 @@ +# T-Digest + +A fast map-reduce and parallel streaming friendly data-structure for accurate +quantile approximation. + +This package provides an implementation of Ted Dunning's t-digest data +structure in Go. + +[![GoDoc](https://godoc.org/github.com/caio/go-tdigest?status.svg)](http://godoc.org/github.com/caio/go-tdigest) +[![Go Report Card](https://goreportcard.com/badge/github.com/caio/go-tdigest)](https://goreportcard.com/report/github.com/caio/go-tdigest) + +## Project Status + +This project is actively maintained. We are happy to collaborate on features +and issues if/when they arrive. + +## Installation + +Our releases are tagged and signed following the [Semantic Versioning][semver] +scheme. If you are using a dependency manager such as [dep][], the recommended +way to is go about your business normally: + + go get github.com/caio/go-tdigest + +Otherwise we recommend to use the following so that you don't risk breaking +your build because of an API change: + + go get gopkg.in/caio/go-tdigest.v2 + +[semver]: http://semver.org/ +[dep]: https://github.com/golang/dep + +## Example Usage + +```go +package main + +import ( + "fmt" + "math/rand" + + "github.com/caio/go-tdigest" +) + +func main() { + // Analogue to tdigest.New(tdigest.Compression(100)) + t, _ := tdigest.New() + + for i := 0; i < 10000; i++ { + // Analogue to t.AddWeighted(rand.Float64(), 1) + t.Add(rand.Float64()) + } + + fmt.Printf("p(.5) = %.6f\n", t.Quantile(0.5)) + fmt.Printf("CDF(Quantile(.5)) = %.6f\n", t.CDF(t.Quantile(0.5))) +} +``` + +## Configuration + +You can configure your digest upon creation with options documented +at [options.go](options.go). Example: + +```go +// Construct a digest with compression=200 and its own +// (thread-unsafe) RNG seeded with 0xCA10: +digest, _ := tdigest.New( + tdigest.Compression(200), + tdigest.LocalRandomNumberGenerator(0xCA10), +) +``` + +## Porting Existing Code to the v2 API + +It's very easy to migrate to the new API: + +- Replace `tdigest.New(100)` with `tdigest.New()` +- Replace `tdigest.New(number)` with `tdigest.New(tdigest.Compression(number))` +- Replace `Add(x,1)` with `Add(x)` +- Replace `Add(x, weight)` with `AddWeighted(x, weight)` +- Remove any use of `tdigest.Len()` (or [open an issue][issues]) + +[issues]: https://github.com/caio/go-tdigest/issues/new + +## References + +This is a port of the [reference][1] implementation with some ideas borrowed +from the [python version][2]. If you wanna get a quick grasp of how it works +and why it's useful, [this video and companion article is pretty helpful][3]. + +[1]: https://github.com/tdunning/t-digest +[2]: https://github.com/CamDavidsonPilon/tdigest +[3]: https://www.mapr.com/blog/better-anomaly-detection-t-digest-whiteboard-walkthrough + diff --git a/vendor/github.com/caio/go-tdigest/options.go b/vendor/github.com/caio/go-tdigest/options.go new file mode 100644 index 0000000000..c30b459545 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/options.go @@ -0,0 +1,51 @@ +package tdigest + +import "errors" + +type tdigestOption func(*TDigest) error + +// Compression sets the digest compression +// +// The compression parameter rules the threshold in which samples are +// merged together - the more often distinct samples are merged the more +// precision is lost. Compression should be tuned according to your data +// distribution, but a value of 100 (the default) is often good enough. +// +// A higher compression value means holding more centroids in memory +// (thus: better precision), which means a bigger serialization payload, +// higher memory footprint and slower addition of new samples. +// +// Compression must be a value greater of equal to 1, will yield an +// error otherwise. +func Compression(compression float64) tdigestOption { // nolint + return func(t *TDigest) error { + if compression < 1 { + return errors.New("Compression should be >= 1") + } + t.compression = compression + return nil + } +} + +// RandomNumberGenerator sets the RNG to be used internally +// +// This allows changing which random number source is used when using +// the TDigest structure (rngs are used when deciding which candidate +// centroid to merge with and when compressing or merging with +// another digest for it increases accuracy). This functionality is +// particularly useful for testing or when you want to disconnect +// your sample collection from the (default) shared random source +// to minimize lock contention. +func RandomNumberGenerator(rng RNG) tdigestOption { // nolint + return func(t *TDigest) error { + t.rng = rng + return nil + } +} + +// LocalRandomNumberGenerator makes the TDigest use the default +// `math/random` functions but with an unshared source that is +// seeded with the given `seed` parameter. +func LocalRandomNumberGenerator(seed int64) tdigestOption { // nolint + return RandomNumberGenerator(newLocalRNG(seed)) +} diff --git a/vendor/github.com/caio/go-tdigest/rng.go b/vendor/github.com/caio/go-tdigest/rng.go new file mode 100644 index 0000000000..856b6ad9f0 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/rng.go @@ -0,0 +1,40 @@ +package tdigest + +import ( + "math/rand" +) + +// RNG is an interface that wraps the needed random number +// generator calls that tdigest uses during its runtime +type RNG interface { + Float32() float32 + Intn(int) int +} + +type globalRNG struct{} + +func (r globalRNG) Float32() float32 { + return rand.Float32() +} + +func (r globalRNG) Intn(i int) int { + return rand.Intn(i) +} + +type localRNG struct { + localRand *rand.Rand +} + +func newLocalRNG(seed int64) *localRNG { + return &localRNG{ + localRand: rand.New(rand.NewSource(seed)), + } +} + +func (r *localRNG) Float32() float32 { + return r.localRand.Float32() +} + +func (r *localRNG) Intn(i int) int { + return r.localRand.Intn(i) +} diff --git a/vendor/github.com/caio/go-tdigest/serialization.go b/vendor/github.com/caio/go-tdigest/serialization.go new file mode 100644 index 0000000000..6acb658b1b --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/serialization.go @@ -0,0 +1,202 @@ +package tdigest + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math" +) + +const smallEncoding int32 = 2 + +var endianess = binary.BigEndian + +// AsBytes serializes the digest into a byte array so it can be +// saved to disk or sent over the wire. +func (t TDigest) AsBytes() ([]byte, error) { + // TODO get rid of the (now) useless error + return t.ToBytes(make([]byte, t.requiredSize())), nil +} + +func (t *TDigest) requiredSize() int { + return 16 + (4 * len(t.summary.means)) + (len(t.summary.counts) * binary.MaxVarintLen64) +} + +// ToBytes serializes into the supplied slice, avoiding allocation if the slice +// is large enough. The result slice is returned. +func (t *TDigest) ToBytes(b []byte) []byte { + requiredSize := t.requiredSize() + if cap(b) < requiredSize { + b = make([]byte, requiredSize) + } + + // The binary.Put* functions helpfully don't extend the slice for you, they + // just panic if it's not already long enough. So pre-set the slice length; + // we'll return it with the actual encoded length. + b = b[:cap(b)] + + endianess.PutUint32(b[0:4], uint32(smallEncoding)) + endianess.PutUint64(b[4:12], math.Float64bits(t.compression)) + endianess.PutUint32(b[12:16], uint32(t.summary.Len())) + + var x float64 + idx := 16 + for _, mean := range t.summary.means { + delta := mean - x + x = mean + endianess.PutUint32(b[idx:], math.Float32bits(float32(delta))) + idx += 4 + } + + for _, count := range t.summary.counts { + idx += binary.PutUvarint(b[idx:], count) + } + return b[:idx] +} + +// FromBytes reads a byte buffer with a serialized digest (from AsBytes) +// and deserializes it. +// +// This function creates a new tdigest instance with the provided options, +// but ignores the compression setting since the correct value comes +// from the buffer. +func FromBytes(buf *bytes.Reader, options ...tdigestOption) (*TDigest, error) { + var encoding int32 + err := binary.Read(buf, endianess, &encoding) + if err != nil { + return nil, err + } + + if encoding != smallEncoding { + return nil, fmt.Errorf("Unsupported encoding version: %d", encoding) + } + + t, err := newWithoutSummary(options...) + + if err != nil { + return nil, err + } + + var compression float64 + err = binary.Read(buf, endianess, &compression) + if err != nil { + return nil, err + } + + t.compression = compression + + var numCentroids int32 + err = binary.Read(buf, endianess, &numCentroids) + if err != nil { + return nil, err + } + + if numCentroids < 0 || numCentroids > 1<<22 { + return nil, errors.New("bad number of centroids in serialization") + } + + t.summary = newSummary(int(numCentroids)) + t.summary.means = t.summary.means[:numCentroids] + t.summary.counts = t.summary.counts[:numCentroids] + + var x float64 + for i := 0; i < int(numCentroids); i++ { + var delta float32 + err = binary.Read(buf, endianess, &delta) + if err != nil { + return nil, err + } + x += float64(delta) + t.summary.means[i] = x + } + + for i := 0; i < int(numCentroids); i++ { + count, err := decodeUint(buf) + if err != nil { + return nil, err + } + t.summary.counts[i] = count + t.count += count + } + + return t, nil +} + +// FromBytes deserializes into the supplied TDigest struct, re-using +// and overwriting any existing buffers. +// +// This method reinitializes the digest from the provided buffer +// discarding any previously collected data. Notice that in case +// of errors this may leave the digest in a unusable state. +func (t *TDigest) FromBytes(buf []byte) error { + if len(buf) < 16 { + return errors.New("buffer too small for deserialization") + } + + encoding := int32(endianess.Uint32(buf)) + if encoding != smallEncoding { + return fmt.Errorf("unsupported encoding version: %d", encoding) + } + + compression := math.Float64frombits(endianess.Uint64(buf[4:12])) + numCentroids := int(endianess.Uint32(buf[12:16])) + if numCentroids < 0 || numCentroids > 1<<22 { + return errors.New("bad number of centroids in serialization") + } + + if len(buf) < 16+(4*numCentroids) { + return errors.New("buffer too small for deserialization") + } + + t.count = 0 + t.compression = compression + if t.summary == nil || + cap(t.summary.means) < numCentroids || + cap(t.summary.counts) < numCentroids { + t.summary = newSummary(numCentroids) + } + t.summary.means = t.summary.means[:numCentroids] + t.summary.counts = t.summary.counts[:numCentroids] + + idx := 16 + var x float64 + for i := 0; i < numCentroids; i++ { + delta := math.Float32frombits(endianess.Uint32(buf[idx:])) + idx += 4 + x += float64(delta) + t.summary.means[i] = x + } + + for i := 0; i < numCentroids; i++ { + count, read := binary.Uvarint(buf[idx:]) + if read < 1 { + return errors.New("error decoding varint, this TDigest is now invalid") + } + + idx += read + + t.summary.counts[i] = count + t.count += count + } + + if idx != len(buf) { + return errors.New("buffer has unread data") + } + return nil +} + +func encodeUint(buf *bytes.Buffer, n uint64) error { + var b [binary.MaxVarintLen64]byte + + l := binary.PutUvarint(b[:], n) + + _, err := buf.Write(b[:l]) + + return err +} + +func decodeUint(buf *bytes.Reader) (uint64, error) { + v, err := binary.ReadUvarint(buf) + return v, err +} diff --git a/vendor/github.com/caio/go-tdigest/summary.go b/vendor/github.com/caio/go-tdigest/summary.go new file mode 100644 index 0000000000..f7c90672e2 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/summary.go @@ -0,0 +1,206 @@ +package tdigest + +import ( + "fmt" + "math" + "sort" +) + +type summary struct { + means []float64 + counts []uint64 +} + +func newSummary(initialCapacity int) *summary { + s := &summary{ + means: make([]float64, 0, initialCapacity), + counts: make([]uint64, 0, initialCapacity), + } + return s +} + +func (s *summary) Len() int { + return len(s.means) +} + +func (s *summary) Add(key float64, value uint64) error { + if math.IsNaN(key) { + return fmt.Errorf("Key must not be NaN") + } + if value == 0 { + return fmt.Errorf("Count must be >0") + } + + idx := s.findInsertionIndex(key) + + s.means = append(s.means, math.NaN()) + s.counts = append(s.counts, 0) + + copy(s.means[idx+1:], s.means[idx:]) + copy(s.counts[idx+1:], s.counts[idx:]) + + s.means[idx] = key + s.counts[idx] = value + + return nil +} + +// Always insert to the right +func (s *summary) findInsertionIndex(x float64) int { + // Binary search is only worthwhile if we have a lot of keys. + if len(s.means) < 250 { + for i, mean := range s.means { + if mean > x { + return i + } + } + return len(s.means) + } + + return sort.Search(len(s.means), func(i int) bool { + return s.means[i] > x + }) +} + +// This method is the hotspot when calling Add(), which in turn is called by +// Compress() and Merge(). +func (s *summary) HeadSum(idx int) (sum float64) { + return float64(sumUntilIndex(s.counts, idx)) +} + +func (s *summary) Floor(x float64) int { + return s.findIndex(x) - 1 +} + +func (s *summary) findIndex(x float64) int { + // Binary search is only worthwhile if we have a lot of keys. + if len(s.means) < 250 { + for i, mean := range s.means { + if mean >= x { + return i + } + } + return len(s.means) + } + + return sort.Search(len(s.means), func(i int) bool { + return s.means[i] >= x + }) +} + +func (s *summary) Mean(uncheckedIndex int) float64 { + return s.means[uncheckedIndex] +} + +func (s *summary) Count(uncheckedIndex int) uint64 { + return s.counts[uncheckedIndex] +} + +// return the index of the last item which the sum of counts +// of items before it is less than or equal to `sum`. -1 in +// case no centroid satisfies the requirement. +// Since it's cheap, this also returns the `HeadSum` until +// the found index (i.e. cumSum = HeadSum(FloorSum(x))) +func (s *summary) FloorSum(sum float64) (index int, cumSum float64) { + index = -1 + for i, count := range s.counts { + if cumSum <= sum { + index = i + } else { + break + } + cumSum += float64(count) + } + if index != -1 { + cumSum -= float64(s.counts[index]) + } + return index, cumSum +} + +func (s *summary) setAt(index int, mean float64, count uint64) { + s.means[index] = mean + s.counts[index] = count + s.adjustRight(index) + s.adjustLeft(index) +} + +func (s *summary) adjustRight(index int) { + for i := index + 1; i < len(s.means) && s.means[i-1] > s.means[i]; i++ { + s.means[i-1], s.means[i] = s.means[i], s.means[i-1] + s.counts[i-1], s.counts[i] = s.counts[i], s.counts[i-1] + } +} + +func (s *summary) adjustLeft(index int) { + for i := index - 1; i >= 0 && s.means[i] > s.means[i+1]; i-- { + s.means[i], s.means[i+1] = s.means[i+1], s.means[i] + s.counts[i], s.counts[i+1] = s.counts[i+1], s.counts[i] + } +} + +func (s *summary) ForEach(f func(float64, uint64) bool) { + for i, mean := range s.means { + if !f(mean, s.counts[i]) { + break + } + } +} + +func (s *summary) Perm(rng RNG, f func(float64, uint64) bool) { + for _, i := range perm(rng, s.Len()) { + if !f(s.means[i], s.counts[i]) { + break + } + } +} + +func (s *summary) Clone() *summary { + return &summary{ + means: append([]float64{}, s.means...), + counts: append([]uint64{}, s.counts...), + } +} + +// Randomly shuffles summary contents, so they can be added to another summary +// with being pathological. Renders summary invalid. +func (s *summary) shuffle(rng RNG) { + for i := len(s.means) - 1; i > 1; i-- { + s.Swap(i, rng.Intn(i+1)) + } +} + +// for sort.Interface +func (s *summary) Swap(i, j int) { + s.means[i], s.means[j] = s.means[j], s.means[i] + s.counts[i], s.counts[j] = s.counts[j], s.counts[i] +} + +func (s *summary) Less(i, j int) bool { + return s.means[i] < s.means[j] +} + +// A simple loop unroll saves a surprising amount of time. +func sumUntilIndex(s []uint64, idx int) uint64 { + var cumSum uint64 + var i int + for i = idx - 1; i >= 3; i -= 4 { + cumSum += uint64(s[i]) + cumSum += uint64(s[i-1]) + cumSum += uint64(s[i-2]) + cumSum += uint64(s[i-3]) + } + for ; i >= 0; i-- { + cumSum += uint64(s[i]) + } + return cumSum +} + +func perm(rng RNG, n int) []int { + m := make([]int, n) + for i := 1; i < n; i++ { + j := rng.Intn(i + 1) + m[i] = m[j] + m[j] = i + } + return m +} diff --git a/vendor/github.com/caio/go-tdigest/tdigest.go b/vendor/github.com/caio/go-tdigest/tdigest.go new file mode 100644 index 0000000000..e1b932c190 --- /dev/null +++ b/vendor/github.com/caio/go-tdigest/tdigest.go @@ -0,0 +1,445 @@ +// Package tdigest provides a highly accurate mergeable data-structure +// for quantile estimation. +// +// Typical T-Digest use cases involve accumulating metrics on several +// distinct nodes of a cluster and then merging them together to get +// a system-wide quantile overview. Things such as: sensory data from +// IoT devices, quantiles over enormous document datasets (think +// ElasticSearch), performance metrics for distributed systems, etc. +// +// After you create (and configure, if desired) the digest: +// digest, err := tdigest.New(tdigest.Compression(100)) +// +// You can then use it for registering measurements: +// digest.Add(number) +// +// Estimating quantiles: +// digest.Quantile(0.99) +// +// And merging with another digest: +// digest.Merge(otherDigest) +package tdigest + +import ( + "fmt" + "math" +) + +// TDigest is a quantile approximation data structure. +type TDigest struct { + summary *summary + compression float64 + count uint64 + rng RNG +} + +// New creates a new digest. +// +// By default the digest is constructed with a configuration that +// should be useful for most use-cases. It comes with compression +// set to 100 and uses a local random number generator for +// performance reasons. +func New(options ...tdigestOption) (*TDigest, error) { + tdigest, err := newWithoutSummary(options...) + + if err != nil { + return nil, err + } + + tdigest.summary = newSummary(estimateCapacity(tdigest.compression)) + return tdigest, nil +} + +// Creates a tdigest instance without allocating a summary. +func newWithoutSummary(options ...tdigestOption) (*TDigest, error) { + tdigest := &TDigest{ + compression: 100, + count: 0, + rng: newLocalRNG(1), + } + + for _, option := range options { + err := option(tdigest) + if err != nil { + return nil, err + } + } + + return tdigest, nil +} + +func _quantile(index float64, previousIndex float64, nextIndex float64, previousMean float64, nextMean float64) float64 { + delta := nextIndex - previousIndex + previousWeight := (nextIndex - index) / delta + nextWeight := (index - previousIndex) / delta + return previousMean*previousWeight + nextMean*nextWeight +} + +// Compression returns the TDigest compression. +func (t *TDigest) Compression() float64 { + return t.compression +} + +// Quantile returns the desired percentile estimation. +// +// Values of p must be between 0 and 1 (inclusive), will panic otherwise. +func (t *TDigest) Quantile(q float64) float64 { + if q < 0 || q > 1 { + panic("q must be between 0 and 1 (inclusive)") + } + + if t.summary.Len() == 0 { + return math.NaN() + } else if t.summary.Len() == 1 { + return t.summary.Mean(0) + } + + index := q * float64(t.count-1) + previousMean := math.NaN() + previousIndex := float64(0) + next, total := t.summary.FloorSum(index) + + if next > 0 { + previousMean = t.summary.Mean(next - 1) + previousIndex = total - float64(t.summary.Count(next-1)+1)/2 + } + + for { + nextIndex := total + float64(t.summary.Count(next)-1)/2 + if nextIndex >= index { + if math.IsNaN(previousMean) { + // the index is before the 1st centroid + if nextIndex == previousIndex { + return t.summary.Mean(next) + } + // assume linear growth + nextIndex2 := total + float64(t.summary.Count(next)) + float64(t.summary.Count(next+1)-1)/2 + previousMean = (nextIndex2*t.summary.Mean(next) - nextIndex*t.summary.Mean(next+1)) / (nextIndex2 - nextIndex) + } + // common case: two centroids found, the result in in between + return _quantile(index, previousIndex, nextIndex, previousMean, t.summary.Mean(next)) + } else if next+1 == t.summary.Len() { + // the index is after the last centroid + nextIndex2 := float64(t.count - 1) + nextMean2 := (t.summary.Mean(next)*(nextIndex2-previousIndex) - previousMean*(nextIndex2-nextIndex)) / (nextIndex - previousIndex) + return _quantile(index, nextIndex, nextIndex2, t.summary.Mean(next), nextMean2) + } + total += float64(t.summary.Count(next)) + previousMean = t.summary.Mean(next) + previousIndex = nextIndex + next++ + } + // unreachable +} + +// boundedWeightedAverage computes the weighted average of two +// centroids guaranteeing that the result will be between x1 and x2, +// inclusive. +// +// Refer to https://github.com/caio/go-tdigest/pull/19 for more details +func boundedWeightedAverage(x1 float64, w1 float64, x2 float64, w2 float64) float64 { + if x1 > x2 { + x1, x2, w1, w2 = x2, x1, w2, w1 + } + result := (x1*w1 + x2*w2) / (w1 + w2) + return math.Max(x1, math.Min(result, x2)) +} + +// AddWeighted registers a new sample in the digest. +// +// It's the main entry point for the digest and very likely the only +// method to be used for collecting samples. The count parameter is for +// when you are registering a sample that occurred multiple times - the +// most common value for this is 1. +// +// This will emit an error if `value` is NaN of if `count` is zero. +func (t *TDigest) AddWeighted(value float64, count uint64) (err error) { + if count == 0 { + return fmt.Errorf("Illegal datapoint ", value, count) + } + + if t.summary.Len() == 0 { + err = t.summary.Add(value, count) + t.count = uint64(count) + return err + } + + begin := t.summary.Floor(value) + if begin == -1 { + begin = 0 + } + + begin, end := t.findNeighbors(begin, value) + + closest := t.chooseMergeCandidate(begin, end, value, count) + + if closest == t.summary.Len() { + err = t.summary.Add(value, count) + if err != nil { + return err + } + } else { + c := float64(t.summary.Count(closest)) + newMean := boundedWeightedAverage(t.summary.Mean(closest), c, value, float64(count)) + t.summary.setAt(closest, newMean, uint64(c)+count) + } + t.count += uint64(count) + + if float64(t.summary.Len()) > 20*t.compression { + err = t.Compress() + } + + return err +} + +// Count returns the total number of samples this digest represents +// +// The result represents how many times Add() was called on a digest +// plus how many samples the digests it has been merged with had. +// This is useful mainly for two scenarios: +// +// - Knowing if there is enough data so you can trust the quantiles +// +// - Knowing if you've registered too many samples already and +// deciding what to do about it. +// +// For the second case one approach would be to create a side empty +// digest and start registering samples on it as well as on the old +// (big) one and then discard the bigger one after a certain criterion +// is reached (say, minimum number of samples or a small relative +// error between new and old digests). +func (t TDigest) Count() uint64 { + return t.count +} + +// Add is an alias for AddWeighted(x,1) +// Read the documentation for AddWeighted for more details. +func (t *TDigest) Add(value float64) error { + return t.AddWeighted(value, 1) +} + +// Compress tries to reduce the number of individual centroids stored +// in the digest. +// +// Compression trades off accuracy for performance and happens +// automatically after a certain amount of distinct samples have been +// stored. +// +// At any point in time you may call Compress on a digest, but you +// may completely ignore this and it will compress itself automatically +// after it grows too much. If you are minimizing network traffic +// it might be a good idea to compress before serializing. +func (t *TDigest) Compress() (err error) { + if t.summary.Len() <= 1 { + return nil + } + + oldTree := t.summary + t.summary = newSummary(estimateCapacity(t.compression)) + t.count = 0 + + oldTree.shuffle(t.rng) + oldTree.ForEach(func(mean float64, count uint64) bool { + err = t.AddWeighted(mean, count) + return err == nil + }) + return err +} + +// Merge joins a given digest into itself. +// +// Merging is useful when you have multiple TDigest instances running +// in separate threads and you want to compute quantiles over all the +// samples. This is particularly important on a scatter-gather/map-reduce +// scenario. +func (t *TDigest) Merge(other *TDigest) (err error) { + if other.summary.Len() == 0 { + return nil + } + + other.summary.Perm(t.rng, func(mean float64, count uint64) bool { + err = t.AddWeighted(mean, count) + return err == nil + }) + return err +} + +// MergeDestructive joins a given digest into itself rendering +// the other digest invalid. +// +// This works as Merge above but its faster. Using this method +// requires caution as it makes 'other' useless - you must make +// sure you discard it without making further uses of it. +func (t *TDigest) MergeDestructive(other *TDigest) (err error) { + if other.summary.Len() == 0 { + return nil + } + + other.summary.shuffle(t.rng) + other.summary.ForEach(func(mean float64, count uint64) bool { + err = t.AddWeighted(mean, count) + return err == nil + }) + return err +} + +// CDF computes the fraction in which all samples are less than +// or equal to the given value. +func (t *TDigest) CDF(value float64) float64 { + if t.summary.Len() == 0 { + return math.NaN() + } else if t.summary.Len() == 1 { + if value < t.summary.Mean(0) { + return 0 + } + return 1 + } + + // We have at least 2 centroids + left := (t.summary.Mean(1) - t.summary.Mean(0)) / 2 + right := left + tot := 0.0 + + for i := 1; i < t.summary.Len()-1; i++ { + prevMean := t.summary.Mean(i - 1) + if value < prevMean+right { + v := (tot + float64(t.summary.Count(i-1))*interpolate(value, prevMean-left, prevMean+right)) / float64(t.Count()) + if v > 0 { + return v + } + return 0 + } + + tot += float64(t.summary.Count(i - 1)) + left = right + right = (t.summary.Mean(i+1) - t.summary.Mean(i)) / 2 + } + + // last centroid, the summary length is at least two + aIdx := t.summary.Len() - 2 + aMean := t.summary.Mean(aIdx) + if value < aMean+right { + aCount := float64(t.summary.Count(aIdx)) + return (tot + aCount*interpolate(value, aMean-left, aMean+right)) / float64(t.Count()) + } + return 1 +} + +// Clone returns a deep copy of a TDigest. +func (t *TDigest) Clone() *TDigest { + return &TDigest{ + summary: t.summary.Clone(), + compression: t.compression, + count: t.count, + rng: t.rng, + } +} + +func interpolate(x, x0, x1 float64) float64 { + return (x - x0) / (x1 - x0) +} + +// ForEachCentroid calls the specified function for each centroid. +// +// Iteration stops when the supplied function returns false, or when all +// centroids have been iterated. +func (t *TDigest) ForEachCentroid(f func(mean float64, count uint64) bool) { + t.summary.ForEach(f) +} + +func (t TDigest) findNeighbors(start int, value float64) (int, int) { + minDistance := math.MaxFloat64 + lastNeighbor := t.summary.Len() + for neighbor := start; neighbor < t.summary.Len(); neighbor++ { + z := math.Abs(t.summary.Mean(neighbor) - value) + if z < minDistance { + start = neighbor + minDistance = z + } else if z > minDistance { + lastNeighbor = neighbor + break + } + } + return start, lastNeighbor +} + +func (t TDigest) chooseMergeCandidate(begin, end int, value float64, count uint64) int { + closest := t.summary.Len() + sum := t.summary.HeadSum(begin) + var n float32 + + for neighbor := begin; neighbor != end; neighbor++ { + c := float64(t.summary.Count(neighbor)) + var q float64 + if t.count == 1 { + q = 0.5 + } else { + q = (sum + (c-1)/2) / float64(t.count-1) + } + k := 4 * float64(t.count) * q * (1 - q) / t.compression + + if c+float64(count) <= k { + n++ + if t.rng.Float32() < 1/n { + closest = neighbor + } + } + sum += c + } + return closest +} + +// TrimmedMean returns the mean of the distribution between the two +// percentiles p1 and p2. +// +// Values of p1 and p2 must be beetween 0 and 1 (inclusive) and p1 +// must be less than p2. Will panic otherwise. +func (t *TDigest) TrimmedMean(p1, p2 float64) float64 { + if p1 < 0 || p1 > 1 { + panic("p1 must be between 0 and 1 (inclusive)") + } + if p2 < 0 || p2 > 1 { + panic("p2 must be between 0 and 1 (inclusive)") + } + if p1 >= p2 { + panic("p1 must be lower than p2") + } + + minCount := p1 * float64(t.count) + maxCount := p2 * float64(t.count) + + var trimmedSum, trimmedCount, currCount float64 + for i, mean := range t.summary.means { + count := float64(t.summary.counts[i]) + + nextCount := currCount + count + if nextCount <= minCount { + currCount = nextCount + continue + } + + if currCount < minCount { + count = nextCount - minCount + } + if nextCount > maxCount { + count -= nextCount - maxCount + } + + trimmedSum += count * mean + trimmedCount += count + + if nextCount >= maxCount { + break + } + currCount = nextCount + } + + if trimmedCount == 0 { + return 0 + } + return trimmedSum / trimmedCount +} + +func estimateCapacity(compression float64) int { + return int(compression) * 10 +} diff --git a/vendor/github.com/cristalhq/hedgedhttp/LICENSE b/vendor/github.com/cristalhq/hedgedhttp/LICENSE new file mode 100644 index 0000000000..7e7c9798a4 --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 cristaltech + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/cristalhq/hedgedhttp/README.md b/vendor/github.com/cristalhq/hedgedhttp/README.md new file mode 100644 index 0000000000..104213b350 --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/README.md @@ -0,0 +1,80 @@ +# hedgedhttp + +[![build-img]][build-url] +[![pkg-img]][pkg-url] +[![reportcard-img]][reportcard-url] +[![coverage-img]][coverage-url] +[![version-img]][version-url] + +Hedged HTTP client which helps to reduce tail latency at scale. + +## Rationale + +See paper [Tail at Scale](https://www.barroso.org/publications/TheTailAtScale.pdf) by Jeffrey Dean, Luiz André Barroso. In short: the client first sends one request, but then sends an additional request after a timeout if the previous hasn't returned an answer in the expected time. The client cancels remaining requests once the first result is received. + +## Acknowledge + +Thanks to [Bohdan Storozhuk](https://github.com/storozhukbm) for the review and powerful hints. + +## Features + +* Simple API. +* Easy to integrate. +* Optimized for speed. +* Clean and tested code. +* Supports `http.Client` and `http.RoundTripper`. +* Dependency-free. + +## Install + +Go version 1.16+ + +``` +go get github.com/cristalhq/hedgedhttp +``` + +## Example + +```go +ctx := context.Background() +req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://google.com", http.NoBody) +if err != nil { + panic(err) +} + +timeout := 10 * time.Millisecond +upto := 7 +client := &http.Client{Timeout: time.Second} +hedged, err := hedgedhttp.NewClient(timeout, upto, client) +if err != nil { + panic(err) +} + +// will take `upto` requests, with a `timeout` delay between them +resp, err := hedged.Do(req) +if err != nil { + panic(err) +} +defer resp.Body.Close() +``` + +Also see examples: [examples_test.go](https://github.com/cristalhq/hedgedhttp/blob/main/examples_test.go). + +## Documentation + +See [these docs][pkg-url]. + +## License + +[MIT License](LICENSE). + +[build-img]: https://github.com/cristalhq/hedgedhttp/workflows/build/badge.svg +[build-url]: https://github.com/cristalhq/hedgedhttp/actions +[pkg-img]: https://pkg.go.dev/badge/cristalhq/hedgedhttp +[pkg-url]: https://pkg.go.dev/github.com/cristalhq/hedgedhttp +[reportcard-img]: https://goreportcard.com/badge/cristalhq/hedgedhttp +[reportcard-url]: https://goreportcard.com/report/cristalhq/hedgedhttp +[coverage-img]: https://codecov.io/gh/cristalhq/hedgedhttp/branch/main/graph/badge.svg +[coverage-url]: https://codecov.io/gh/cristalhq/hedgedhttp +[version-img]: https://img.shields.io/github/v/release/cristalhq/hedgedhttp +[version-url]: https://github.com/cristalhq/hedgedhttp/releases diff --git a/vendor/github.com/cristalhq/hedgedhttp/hedged.go b/vendor/github.com/cristalhq/hedgedhttp/hedged.go new file mode 100644 index 0000000000..b7b33f50b8 --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/hedged.go @@ -0,0 +1,387 @@ +package hedgedhttp + +import ( + "context" + "errors" + "fmt" + "net/http" + "strings" + "sync" + "time" +) + +const infiniteTimeout = 30 * 24 * time.Hour // domain specific infinite + +// Client represents a hedged HTTP client. +type Client struct { + rt http.RoundTripper + stats *Stats +} + +// Config for the [Client]. +type Config struct { + // Transport of the [Client]. + // Default is nil which results in [net/http.DefaultTransport]. + Transport http.RoundTripper + + // Upto says how much requests to make. + // Default is zero which means no hedged requests will be made. + Upto int + + // Delay before 2 consequitive hedged requests. + Delay time.Duration + + // Next returns the upto and delay for each HTTP that will be hedged. + // Default is nil which results in (Upto, Delay) result. + Next NextFn +} + +// NextFn represents a function that is called for each HTTP request for retrieving hedging options. +type NextFn func() (upto int, delay time.Duration) + +// New returns a new Client for the given config. +func New(cfg Config) (*Client, error) { + switch { + case cfg.Delay < 0: + return nil, errors.New("hedgedhttp: timeout cannot be negative") + case cfg.Upto < 0: + return nil, errors.New("hedgedhttp: upto cannot be negative") + } + if cfg.Transport == nil { + cfg.Transport = http.DefaultTransport + } + + rt, stats, err := NewRoundTripperAndStats(cfg.Delay, cfg.Upto, cfg.Transport) + if err != nil { + return nil, err + } + + // TODO(cristaloleg): this should be removed after internals cleanup. + rt2, ok := rt.(*hedgedTransport) + if !ok { + panic(fmt.Sprintf("want *hedgedTransport got %T", rt)) + } + rt2.next = cfg.Next + + c := &Client{ + rt: rt2, + stats: stats, + } + return c, nil +} + +// Stats returns statistics for the given client, see [Stats] methods. +func (c *Client) Stats() *Stats { + return c.stats +} + +// Do does the same as [RoundTrip], this method is presented to align with [net/http.Client]. +func (c *Client) Do(req *http.Request) (*http.Response, error) { + return c.rt.RoundTrip(req) +} + +// RoundTrip implements [net/http.RoundTripper] interface. +func (c *Client) RoundTrip(req *http.Request) (*http.Response, error) { + return c.rt.RoundTrip(req) +} + +// NewClient returns a new http.Client which implements hedged requests pattern. +// Given Client starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewClient(timeout time.Duration, upto int, client *http.Client) (*http.Client, error) { + newClient, _, err := NewClientAndStats(timeout, upto, client) + if err != nil { + return nil, err + } + return newClient, nil +} + +// NewClientAndStats returns a new http.Client which implements hedged requests pattern +// And Stats object that can be queried to obtain client's metrics. +// Given Client starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewClientAndStats(timeout time.Duration, upto int, client *http.Client) (*http.Client, *Stats, error) { + if client == nil { + client = &http.Client{ + Timeout: 5 * time.Second, + } + } + + newTransport, metrics, err := NewRoundTripperAndStats(timeout, upto, client.Transport) + if err != nil { + return nil, nil, err + } + + client.Transport = newTransport + + return client, metrics, nil +} + +// NewRoundTripper returns a new http.RoundTripper which implements hedged requests pattern. +// Given RoundTripper starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewRoundTripper(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, error) { + newRT, _, err := NewRoundTripperAndStats(timeout, upto, rt) + if err != nil { + return nil, err + } + return newRT, nil +} + +// NewRoundTripperAndStats returns a new http.RoundTripper which implements hedged requests pattern +// And Stats object that can be queried to obtain client's metrics. +// Given RoundTripper starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewRoundTripperAndStats(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, *Stats, error) { + switch { + case timeout < 0: + return nil, nil, errors.New("hedgedhttp: timeout cannot be negative") + case upto < 0: + return nil, nil, errors.New("hedgedhttp: upto cannot be negative") + } + + if rt == nil { + rt = http.DefaultTransport + } + + if timeout == 0 { + timeout = time.Nanosecond // smallest possible timeout if not set + } + + hedged := &hedgedTransport{ + rt: rt, + timeout: timeout, + upto: upto, + metrics: &Stats{}, + } + return hedged, hedged.metrics, nil +} + +type hedgedTransport struct { + rt http.RoundTripper + timeout time.Duration + upto int + next NextFn + metrics *Stats +} + +func (ht *hedgedTransport) RoundTrip(req *http.Request) (*http.Response, error) { + mainCtx := req.Context() + + upto, timeout := ht.upto, ht.timeout + if ht.next != nil { + upto, timeout = ht.next() + } + + // no hedged requests, just a regular one. + if upto <= 0 { + return ht.rt.RoundTrip(req) + } + // rollback to default timeout. + if timeout < 0 { + timeout = ht.timeout + } + + errOverall := &MultiError{} + resultCh := make(chan indexedResp, upto) + errorCh := make(chan error, upto) + + ht.metrics.requestedRoundTripsInc() + + resultIdx := -1 + cancels := make([]func(), upto) + + defer runInPool(func() { + for i, cancel := range cancels { + if i != resultIdx && cancel != nil { + ht.metrics.canceledSubRequestsInc() + cancel() + } + } + }) + + for sent := 0; len(errOverall.Errors) < upto; sent++ { + if sent < upto { + idx := sent + subReq, cancel := reqWithCtx(req, mainCtx, idx != 0) + cancels[idx] = cancel + + runInPool(func() { + ht.metrics.actualRoundTripsInc() + resp, err := ht.rt.RoundTrip(subReq) + if err != nil { + ht.metrics.failedRoundTripsInc() + errorCh <- err + } else { + resultCh <- indexedResp{idx, resp} + } + }) + } + + // all request sent - effectively disabling timeout between requests + if sent == upto { + timeout = infiniteTimeout + } + resp, err := waitResult(mainCtx, resultCh, errorCh, timeout) + + switch { + case resp.Resp != nil: + resultIdx = resp.Index + if resultIdx == 0 { + ht.metrics.originalRequestWinsInc() + } else { + ht.metrics.hedgedRequestWinsInc() + } + return resp.Resp, nil + case mainCtx.Err() != nil: + ht.metrics.canceledByUserRoundTripsInc() + return nil, mainCtx.Err() + case err != nil: + errOverall.Errors = append(errOverall.Errors, err) + } + } + + // all request have returned errors + return nil, errOverall +} + +func waitResult(ctx context.Context, resultCh <-chan indexedResp, errorCh <-chan error, timeout time.Duration) (indexedResp, error) { + // try to read result first before blocking on all other channels + select { + case res := <-resultCh: + return res, nil + default: + timer := getTimer(timeout) + defer returnTimer(timer) + + select { + case res := <-resultCh: + return res, nil + + case reqErr := <-errorCh: + return indexedResp{}, reqErr + + case <-ctx.Done(): + return indexedResp{}, ctx.Err() + + case <-timer.C: + return indexedResp{}, nil // it's not a request timeout, it's timeout BETWEEN consecutive requests + } + } +} + +type indexedResp struct { + Index int + Resp *http.Response +} + +func reqWithCtx(r *http.Request, ctx context.Context, isHedged bool) (*http.Request, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + if isHedged { + ctx = context.WithValue(ctx, hedgedRequest{}, struct{}{}) + } + req := r.WithContext(ctx) + return req, cancel +} + +type hedgedRequest struct{} + +// IsHedgedRequest reports when a request is hedged. +func IsHedgedRequest(r *http.Request) bool { + val := r.Context().Value(hedgedRequest{}) + return val != nil +} + +var taskQueue = make(chan func()) + +func runInPool(task func()) { + select { + case taskQueue <- task: + // submitted, everything is ok + + default: + go func() { + // do the given task + task() + + const cleanupDuration = 10 * time.Second + cleanupTicker := time.NewTicker(cleanupDuration) + defer cleanupTicker.Stop() + + for { + select { + case t := <-taskQueue: + t() + cleanupTicker.Reset(cleanupDuration) + case <-cleanupTicker.C: + return + } + } + }() + } +} + +// MultiError is an error type to track multiple errors. This is used to +// accumulate errors in cases and return them as a single "error". +// Inspired by https://github.com/hashicorp/go-multierror +type MultiError struct { + Errors []error + ErrorFormatFn ErrorFormatFunc +} + +func (e *MultiError) Error() string { + fn := e.ErrorFormatFn + if fn == nil { + fn = listFormatFunc + } + return fn(e.Errors) +} + +func (e *MultiError) String() string { + return fmt.Sprintf("*%#v", e.Errors) +} + +// ErrorOrNil returns an error if there are some. +func (e *MultiError) ErrorOrNil() error { + switch { + case e == nil || len(e.Errors) == 0: + return nil + default: + return e + } +} + +// ErrorFormatFunc is called by MultiError to return the list of errors as a string. +type ErrorFormatFunc func([]error) string + +func listFormatFunc(es []error) string { + if len(es) == 1 { + return fmt.Sprintf("1 error occurred:\n\t* %s\n\n", es[0]) + } + + points := make([]string, len(es)) + for i, err := range es { + points[i] = fmt.Sprintf("* %s", err) + } + + return fmt.Sprintf("%d errors occurred:\n\t%s\n\n", len(es), strings.Join(points, "\n\t")) +} + +var timerPool = sync.Pool{New: func() interface{} { + return time.NewTimer(time.Second) +}} + +func getTimer(duration time.Duration) *time.Timer { + timer := timerPool.Get().(*time.Timer) + timer.Reset(duration) + return timer +} + +func returnTimer(timer *time.Timer) { + timer.Stop() + select { + case <-timer.C: + default: + } + timerPool.Put(timer) +} diff --git a/vendor/github.com/cristalhq/hedgedhttp/stats.go b/vendor/github.com/cristalhq/hedgedhttp/stats.go new file mode 100644 index 0000000000..f293318908 --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/stats.go @@ -0,0 +1,87 @@ +package hedgedhttp + +import "sync/atomic" + +// atomicCounter is a false sharing safe counter. +type atomicCounter struct { + count uint64 + _ [7]uint64 +} + +type cacheLine [64]byte + +// Stats object that can be queried to obtain certain metrics and get better observability. +type Stats struct { + _ cacheLine + requestedRoundTrips atomicCounter + actualRoundTrips atomicCounter + failedRoundTrips atomicCounter + originalRequestWins atomicCounter + hedgedRequestWins atomicCounter + canceledByUserRoundTrips atomicCounter + canceledSubRequests atomicCounter + _ cacheLine +} + +func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) } +func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) } +func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) } +func (s *Stats) originalRequestWinsInc() { atomic.AddUint64(&s.originalRequestWins.count, 1) } +func (s *Stats) hedgedRequestWinsInc() { atomic.AddUint64(&s.hedgedRequestWins.count, 1) } +func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) } +func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) } + +// RequestedRoundTrips returns count of requests that were requested by client. +func (s *Stats) RequestedRoundTrips() uint64 { + return atomic.LoadUint64(&s.requestedRoundTrips.count) +} + +// ActualRoundTrips returns count of requests that were actually sent. +func (s *Stats) ActualRoundTrips() uint64 { + return atomic.LoadUint64(&s.actualRoundTrips.count) +} + +// FailedRoundTrips returns count of requests that failed. +func (s *Stats) FailedRoundTrips() uint64 { + return atomic.LoadUint64(&s.failedRoundTrips.count) +} + +// OriginalRequestWins returns count of original requests that were faster than the original. +func (s *Stats) OriginalRequestWins() uint64 { + return atomic.LoadUint64(&s.originalRequestWins.count) +} + +// HedgedRequestWins returns count of hedged requests that were faster than the original. +func (s *Stats) HedgedRequestWins() uint64 { + return atomic.LoadUint64(&s.hedgedRequestWins.count) +} + +// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context. +func (s *Stats) CanceledByUserRoundTrips() uint64 { + return atomic.LoadUint64(&s.canceledByUserRoundTrips.count) +} + +// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport. +func (s *Stats) CanceledSubRequests() uint64 { + return atomic.LoadUint64(&s.canceledSubRequests.count) +} + +// StatsSnapshot is a snapshot of Stats. +type StatsSnapshot struct { + RequestedRoundTrips uint64 // count of requests that were requested by client + ActualRoundTrips uint64 // count of requests that were actually sent + FailedRoundTrips uint64 // count of requests that failed + CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context + CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport +} + +// Snapshot of the stats. +func (s *Stats) Snapshot() StatsSnapshot { + return StatsSnapshot{ + RequestedRoundTrips: s.RequestedRoundTrips(), + ActualRoundTrips: s.ActualRoundTrips(), + FailedRoundTrips: s.FailedRoundTrips(), + CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(), + CanceledSubRequests: s.CanceledSubRequests(), + } +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/exthttp/hedging.go b/vendor/github.com/thanos-io/thanos/pkg/exthttp/hedging.go new file mode 100644 index 0000000000..09a1b3e8a2 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/exthttp/hedging.go @@ -0,0 +1,96 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package exthttp + +import ( + "fmt" + "net/http" + "sync" + "time" + + "github.com/caio/go-tdigest" + "github.com/cristalhq/hedgedhttp" +) + +type CustomBucketConfig struct { + HedgingConfig HedgingConfig `yaml:"hedging_config"` +} + +type HedgingConfig struct { + Enabled bool `yaml:"enabled"` + UpTo uint `yaml:"up_to"` + Quantile float64 `yaml:"quantile"` +} + +func DefaultCustomBucketConfig() CustomBucketConfig { + return CustomBucketConfig{ + HedgingConfig: HedgingConfig{ + Enabled: false, + UpTo: 3, + Quantile: 0.9, + }, + } +} + +type hedgingRoundTripper struct { + Transport http.RoundTripper + TDigest *tdigest.TDigest + mu sync.RWMutex + config HedgingConfig +} + +func (hrt *hedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + start := time.Now() + resp, err := hrt.Transport.RoundTrip(req) + if err != nil { + return nil, err + } + duration := float64(time.Since(start).Milliseconds()) + hrt.mu.Lock() + err = hrt.TDigest.Add(duration) + if err != nil { + return nil, err + } + hrt.mu.Unlock() + return resp, err +} + +func (hrt *hedgingRoundTripper) nextFn() (int, time.Duration) { + hrt.mu.RLock() + defer hrt.mu.RUnlock() + + delayMs := hrt.TDigest.Quantile(hrt.config.Quantile) + delay := time.Duration(delayMs) * time.Millisecond + upto := int(hrt.config.UpTo) + return upto, delay +} + +func CreateHedgedTransportWithConfig(config CustomBucketConfig) func(rt http.RoundTripper) http.RoundTripper { + if !config.HedgingConfig.Enabled { + return func(rt http.RoundTripper) http.RoundTripper { + return rt + } + } + return func(rt http.RoundTripper) http.RoundTripper { + td, err := tdigest.New() + if err != nil { + panic(fmt.Sprintf("BUG: Failed to initialize T-Digest: %v", err)) + } + hrt := &hedgingRoundTripper{ + Transport: rt, + TDigest: td, + config: config.HedgingConfig, + } + cfg := hedgedhttp.Config{ + Transport: hrt, + Upto: int(config.HedgingConfig.UpTo), + Next: hrt.nextFn, + } + hedgedrt, err := hedgedhttp.New(cfg) + if err != nil { + panic(fmt.Sprintf("BUG: Failed to create hedged transport: %v", err)) + } + return hedgedrt + } +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/exthttp/tlsconfig.go b/vendor/github.com/thanos-io/thanos/pkg/exthttp/tlsconfig.go new file mode 100644 index 0000000000..26b58d24f7 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/exthttp/tlsconfig.go @@ -0,0 +1,87 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package exthttp + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" +) + +// TLSConfig configures the options for TLS connections. +type TLSConfig struct { + // The CA cert to use for the targets. + CAFile string `yaml:"ca_file"` + // The client cert file for the targets. + CertFile string `yaml:"cert_file"` + // The client key file for the targets. + KeyFile string `yaml:"key_file"` + // Used to verify the hostname for the targets. + ServerName string `yaml:"server_name"` + // Disable target certificate validation. + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` +} + +// NewTLSConfig creates a new tls.Config from the given TLSConfig. +func NewTLSConfig(cfg *TLSConfig) (*tls.Config, error) { + tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify} + + // If a CA cert is provided then let's read it in. + if len(cfg.CAFile) > 0 { + b, err := readCAFile(cfg.CAFile) + if err != nil { + return nil, err + } + if !updateRootCA(tlsConfig, b) { + return nil, fmt.Errorf("unable to use specified CA cert %s", cfg.CAFile) + } + } + + if len(cfg.ServerName) > 0 { + tlsConfig.ServerName = cfg.ServerName + } + // If a client cert & key is provided then configure TLS config accordingly. + if len(cfg.CertFile) > 0 && len(cfg.KeyFile) == 0 { + return nil, fmt.Errorf("client cert file %q specified without client key file", cfg.CertFile) + } else if len(cfg.KeyFile) > 0 && len(cfg.CertFile) == 0 { + return nil, fmt.Errorf("client key file %q specified without client cert file", cfg.KeyFile) + } else if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 { + // Verify that client cert and key are valid. + if _, err := cfg.getClientCertificate(nil); err != nil { + return nil, err + } + tlsConfig.GetClientCertificate = cfg.getClientCertificate + } + + return tlsConfig, nil +} + +// readCAFile reads the CA cert file from disk. +func readCAFile(f string) ([]byte, error) { + data, err := os.ReadFile(f) + if err != nil { + return nil, fmt.Errorf("unable to load specified CA cert %s: %s", f, err) + } + return data, nil +} + +// updateRootCA parses the given byte slice as a series of PEM encoded certificates and updates tls.Config.RootCAs. +func updateRootCA(cfg *tls.Config, b []byte) bool { + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(b) { + return false + } + cfg.RootCAs = caCertPool + return true +} + +// getClientCertificate reads the pair of client cert and key from disk and returns a tls.Certificate. +func (c *TLSConfig) getClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", c.CertFile, c.KeyFile, err) + } + return &cert, nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/exthttp/transport.go b/vendor/github.com/thanos-io/thanos/pkg/exthttp/transport.go new file mode 100644 index 0000000000..76d6ee70d4 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/exthttp/transport.go @@ -0,0 +1,69 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package exthttp + +import ( + "net" + "net/http" + "time" + + "github.com/prometheus/common/model" +) + +// TODO(bwplotka): HTTPConfig stores the http.Transport configuration for the cos and s3 minio client. +type HTTPConfig struct { + IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"` + ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"` + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` + + TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"` + ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"` + MaxIdleConns int `yaml:"max_idle_conns"` + MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` + MaxConnsPerHost int `yaml:"max_conns_per_host"` + + // Transport field allows upstream callers to inject a custom round tripper. + Transport http.RoundTripper `yaml:"-"` + + TLSConfig TLSConfig `yaml:"tls_config"` + DisableCompression bool `yaml:"disable_compression"` +} + +// DefaultTransport - this default transport is based on the Minio +// DefaultTransport up until the following commit: +// https://github.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884 +// The values have since diverged. +func DefaultTransport(config HTTPConfig) (*http.Transport, error) { + tlsConfig, err := NewTLSConfig(&config.TLSConfig) + if err != nil { + return nil, err + } + tlsConfig.InsecureSkipVerify = config.InsecureSkipVerify + + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + + MaxIdleConns: config.MaxIdleConns, + MaxIdleConnsPerHost: config.MaxIdleConnsPerHost, + IdleConnTimeout: time.Duration(config.IdleConnTimeout), + MaxConnsPerHost: config.MaxConnsPerHost, + TLSHandshakeTimeout: time.Duration(config.TLSHandshakeTimeout), + ExpectContinueTimeout: time.Duration(config.ExpectContinueTimeout), + // A custom ResponseHeaderTimeout was introduced + // to cover cases where the tcp connection works but + // the server never answers. Defaults to 2 minutes. + ResponseHeaderTimeout: time.Duration(config.ResponseHeaderTimeout), + // Set this value so that the underlying transport round-tripper + // doesn't try to auto decode the body of objects with + // content-encoding set to `gzip`. + // + // Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843. + TLSClientConfig: tlsConfig, + }, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 96f6d5a561..6f824169b6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -287,6 +287,9 @@ github.com/blang/semver/v4 # github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 ## explicit; go 1.18 github.com/bradfitz/gomemcache/memcache +# github.com/caio/go-tdigest v3.1.0+incompatible +## explicit +github.com/caio/go-tdigest # github.com/cenkalti/backoff/v4 v4.3.0 ## explicit; go 1.18 github.com/cenkalti/backoff/v4 @@ -306,6 +309,9 @@ github.com/coreos/go-systemd/v22/journal # github.com/cortexproject/promqlsmith v0.0.0-20241121054008-8b48fe2471ef ## explicit; go 1.22.0 github.com/cortexproject/promqlsmith +# github.com/cristalhq/hedgedhttp v0.9.1 +## explicit; go 1.16 +github.com/cristalhq/hedgedhttp # github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc ## explicit github.com/davecgh/go-spew/spew @@ -1004,6 +1010,7 @@ github.com/thanos-io/thanos/pkg/discovery/memcache github.com/thanos-io/thanos/pkg/errutil github.com/thanos-io/thanos/pkg/exemplars/exemplarspb github.com/thanos-io/thanos/pkg/extgrpc/snappy +github.com/thanos-io/thanos/pkg/exthttp github.com/thanos-io/thanos/pkg/extkingpin github.com/thanos-io/thanos/pkg/extprom github.com/thanos-io/thanos/pkg/extprom/http