Skip to content

Commit

Permalink
Add rate limiting and metrics to hedging (#4860)
Browse files Browse the repository at this point in the history
* Add rate limiting and metrics to hedging

Signed-off-by: Cyril Tovena <[email protected]>

* Add changelog entry

Signed-off-by: Cyril Tovena <[email protected]>

* Allow to pass a custom registerer

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Dec 3, 2021
1 parent 77dcb16 commit c33a651
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [4860](https://github.com/grafana/loki/pull/4860) **cyriltovena**: Add rate limiting and metrics to hedging
* [4865](https://github.com/grafana/loki/pull/4865) **taisho6339**: Fix duplicate registry.MustRegister call in Promtail Kafka
* [4845](https://github.com/grafana/loki/pull/4845) **chaudum** Return error responses consistently as JSON
* [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests.
Expand Down
6 changes: 5 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,12 @@ You should configure the latency based on your p99 of object store requests.
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
# The maximum amount of hedge requests to be issued for a given request.
[up_to: <int> | default = 2]
# Optional. Default is 5
# The maximum amount of hedged requests to be issued per seconds.
[max_per_second: <int> | default = 5]
```

## local_storage_config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab
github.com/cristalhq/hedgedhttp v0.6.1
github.com/cristalhq/hedgedhttp v0.6.2
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA=
github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cristalhq/hedgedhttp v0.6.2 h1:aWnUOzqPaM8/dgmPLR7wl0AoFOPYnqgdhTkcWgWUgpA=
github.com/cristalhq/hedgedhttp v0.6.2/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S
}

if hedging {
httpClient = hedgingCfg.Client(httpClient)
httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
}

s3Config = s3Config.WithHTTPClient(httpClient)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func Test_Hedging(t *testing.T) {
})
},
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)
Expand Down
82 changes: 45 additions & 37 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/mattn/go-ieproxy"
"github.com/prometheus/client_golang/prometheus"

cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -57,23 +58,25 @@ var (
}

// default Azure http client.
defaultClient = &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
}
}
)

Expand Down Expand Up @@ -132,20 +135,30 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig
// Implements ObjectStorage
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig
hedgingCfg hedging.Config
cfg *BlobStorageConfig

containerURL azblob.ContainerURL

pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
}

// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage")
blobStorage := &BlobStorage{
cfg: cfg,
hedgingCfg: hedgingCfg,
cfg: cfg,
}

var err error
pipeline, err := blobStorage.newPipeline(hedgingCfg, false)
if err != nil {
return nil, err
}
blobStorage.pipeline = pipeline
hedgingPipeline, err := blobStorage.newPipeline(hedgingCfg, true)
if err != nil {
return nil, err
}
blobStorage.hedgingPipeline = hedgingPipeline
blobStorage.containerURL, err = blobStorage.buildContainerURL()
if err != nil {
return nil, err
Expand Down Expand Up @@ -210,13 +223,12 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
if err != nil {
return azblob.BlockBlobURL{}, err
}

azPipeline, err := b.newPipeline(hedging)
if err != nil {
return azblob.BlockBlobURL{}, err
pipeline := b.pipeline
if hedging {
pipeline = b.hedgingPipeline
}

return azblob.NewBlockBlobURL(*u, azPipeline), nil
return azblob.NewBlockBlobURL(*u, pipeline), nil
}

func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
Expand All @@ -225,15 +237,10 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
return azblob.ContainerURL{}, err
}

azPipeline, err := b.newPipeline(false)
if err != nil {
return azblob.ContainerURL{}, err
}

return azblob.NewContainerURL(*u, azPipeline), nil
return azblob.NewContainerURL(*u, b.pipeline), nil
}

func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) {
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
Expand All @@ -248,17 +255,18 @@ func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}
client := defaultClientFactory()

opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := defaultClient.Do(request.WithContext(ctx))
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})

if hedging {
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
client := b.hedgingCfg.Client(defaultClient)
client := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
Expand Down
19 changes: 11 additions & 8 deletions pkg/storage/chunk/azure/blob_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,23 @@ func Test_Hedging(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the client to count the number of calls
defaultClient = &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
}
}
c, err := NewBlobStorage(&BlobStorageConfig{
ContainerName: "foo",
Environment: azureGlobal,
MaxRetries: 1,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/chunk/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"cloud.google.com/go/storage"
cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

Expand Down Expand Up @@ -87,7 +88,7 @@ func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Conf
}

if hedging {
httpClient = hedgingCfg.Client(httpClient)
httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
}

opts = append(opts, option.WithHTTPClient(httpClient))
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/gcp/gcs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func Test_Hedging(t *testing.T) {
BucketName: "test-bucket",
Insecure: true,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
opts = append(opts, option.WithEndpoint(server.URL))
opts = append(opts, option.WithoutAuthentication())
Expand Down
96 changes: 93 additions & 3 deletions pkg/storage/chunk/hedging/hedging.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
package hedging

import (
"errors"
"flag"
"net/http"
"sync"
"time"

"github.com/cristalhq/hedgedhttp"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

var (
ErrTooManyHedgeRequests = errors.New("too many hedge requests")
totalHedgeRequests prometheus.Counter
totalRateLimitedHedgeRequests prometheus.Counter
once sync.Once
)

func init() {
initMetrics()
}

func initMetrics() {
once = sync.Once{}
totalHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_total",
Help: "The total number of hedged requests.",
})

totalRateLimitedHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_rate_limited_total",
Help: "The total number of hedged requests rejected via rate limiting.",
})
}

// Config is the configuration for hedging requests.
type Config struct {
// At is the duration after which a second request will be issued.
At time.Duration `yaml:"at"`
// UpTo is the maximum number of requests that will be issued.
UpTo int `yaml:"up_to"`
// The maximun of hedge requests allowed per second.
MaxPerSecond int `yaml:"max_per_second"`
}

// RegisterFlags registers flags.
Expand All @@ -25,18 +55,78 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.")
f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)")
f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.")
}

// Client returns a hedged http client.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) Client(client *http.Client) *http.Client {
return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer)
}

// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) *http.Client {
if reg == nil {
reg = prometheus.DefaultRegisterer
}
if client == nil {
client = http.DefaultClient
}
if cfg.At == 0 {
return client
}
return hedgedhttp.NewClient(cfg.At, cfg.UpTo, client)
client.Transport = cfg.RoundTripperWithRegisterer(client.Transport, reg)
return client
}

func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer.
func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) http.RoundTripper {
if reg == nil {
reg = prometheus.DefaultRegisterer
}
if next == nil {
next = http.DefaultTransport
}
if cfg.At == 0 {
return next
}
return hedgedhttp.NewRoundTripper(cfg.At, cfg.UpTo, next)
// register metrics
once.Do(func() {
reg.MustRegister(totalHedgeRequests)
reg.MustRegister(totalRateLimitedHedgeRequests)
})
return hedgedhttp.NewRoundTripper(
cfg.At,
cfg.UpTo,
newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next),
)
}

// RoundTripper returns a hedged roundtripper.
func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer)
}

type limitedHedgingRoundTripper struct {
next http.RoundTripper
limiter *rate.Limiter
}

func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper {
return &limitedHedgingRoundTripper{
next: next,
limiter: rate.NewLimiter(rate.Limit(max), max),
}
}

func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if hedgedhttp.IsHedgedRequest(req) {
if !rt.limiter.Allow() {
totalRateLimitedHedgeRequests.Inc()
return nil, ErrTooManyHedgeRequests
}
totalHedgeRequests.Inc()
}
return rt.next.RoundTrip(req)
}
Loading

0 comments on commit c33a651

Please sign in to comment.