diff --git a/CHANGELOG.md b/CHANGELOG.md index 6197a748af65d..6c5beb43c83ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ## Main +* [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests. +* [4828](https://github.com/grafana/loki/pull/4282) **chaudum**: Set correct `Content-Type` header in query response * [4832](https://github.com/grafana/loki/pull/4832) **taisho6339**: Use http prefix path correctly in Promtail -* [4828](https://github.com/grafana/loki/pull/4828) **chaudum**: Set correct `Content-Type` header in query response * [4794](https://github.com/grafana/loki/pull/4794) **taisho6339**: Aggregate inotify watcher to file target manager * [4663](https://github.com/grafana/loki/pull/4663) **taisho6339**: Add SASL&mTLS authentication support for Kafka in Promtail * [4745](https://github.com/grafana/loki/pull/4745) **taisho6339**: Expose Kafka message key in labels diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index d460cef4f2511..164c3b2d7b254 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -312,9 +312,9 @@ The `query_scheduler` block configures the Loki query scheduler. # This configures the gRPC client used to report errors back to the # query-frontend. [grpc_client_config: ] - + # Set to true to have the query schedulers create and place themselves in a ring. -# If no frontend_address or scheduler_address are present +# If no frontend_address or scheduler_address are present # anywhere else in the configuration, Loki will toggle this value to true. [use_scheduler_ring: | default = false] @@ -453,7 +453,7 @@ storage: # Method to use for backend rule storage (azure, gcs, s3, swift, local). # CLI flag: -ruler.storage.type [type: ] - + # Configures backend rule storage for Azure. [azure: ] @@ -469,6 +469,9 @@ storage: # Configures backend rule storage for a local filesystem directory. [local: ] + # The `hedging_config` configures how to hedge requests for the storage. + [hedging: ] + # Remote-write configuration to send rule samples to a Prometheus remote-write endpoint. remote_write: # Enable remote-write functionality. @@ -509,8 +512,8 @@ remote_write: # List of remote write relabel configurations. write_relabel_configs: [- ...] - - # Name of the remote write config, which if specified must be unique among remote + + # Name of the remote write config, which if specified must be unique among remote # write configs. # The name will be used in metrics and logging in place of a generated value # to help users distinguish between remote write configs. @@ -853,6 +856,26 @@ The `swift_storage_config` configures Swift as a general storage for different d [container_name: | default = "cortex"] ``` +## hedging_config + +The `hedging_config` configures how to hedge requests for the storage. + +Hedged requests is sending a secondary request until the first request has been outstanding for more than a configure expected latency +for this class of requests. +You should configure the latency based on your p99 of object store requests. + +```yaml +# Optional. Default is 0 (disabled) +# Example: "at: 500ms" +# If set to a non-zero value another request will be issued at the provided duration. Recommended to +# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when +# used with queriers and has minimal to no impact on other pieces. +[at: | default = 0] +# Optional. Default is 2 +# The maximum amount of requests to be issued. +[up_to: | default = 2] +``` + ## local_storage_config The `local_storage_config` configures a (local) filesystem as a general storage for different data generated by Loki. @@ -1323,11 +1346,11 @@ aws: # Minimum duration to back off. # CLI flag: -s3.backoff-min-period [min_period: | default = 100ms] - + # The duration to back off. # CLI flag: -s3.backoff-max-period [max_period: | default = 3s] - + # Number of times to back off and retry before failing. # CLI flag: -s3.backoff-retries [max_retries: | default = 5] @@ -2081,7 +2104,7 @@ The `limits_config` block configures global and per-tenant limits in Loki. # up until lookback duration ago. # This limit is enforced in the query frontend, the querier and the ruler. # If the requested time range is outside the allowed range, the request will not fail, -# but will be modified to only query data within the allowed time range. +# but will be modified to only query data within the allowed time range. # The default value of 0 does not set a limit. # CLI flag: -querier.max-query-lookback [max_query_lookback: | default = 0] @@ -2360,6 +2383,9 @@ If any specific configuration for an object storage client have been provided el # Configures a (local) filesystem as the common storage. [filesystem: ] + +# The `hedging_config` configures how to hedge requests for the storage. +[hedging: ] ``` ### ring_config @@ -2512,7 +2538,7 @@ How far into the past accepted out-of-order log entries may be is configurable with `max_chunk_age`. `max_chunk_age` defaults to 1 hour. Loki calculates the earliest time that out-of-order entries may have -and be accepted with +and be accepted with ``` time_of_most_recent_line - (max_chunk_age/2) diff --git a/go.mod b/go.mod index 990eaf7a52225..8efc265b594b8 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab + github.com/cristalhq/hedgedhttp v0.6.1 github.com/davecgh/go-spew v1.1.1 github.com/docker/docker v20.10.11+incompatible github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8 @@ -103,6 +104,7 @@ require ( ) require ( + github.com/mattn/go-ieproxy v0.0.1 github.com/xdg-go/scram v1.0.2 gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 ) @@ -211,7 +213,6 @@ require ( github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.1.8 // indirect - github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/miekg/dns v1.1.43 // indirect diff --git a/go.sum b/go.sum index e981d07459fe1..d78675ad6c28c 100644 --- a/go.sum +++ b/go.sum @@ -482,6 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA= +github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI= github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 208245d6f4c44..f486b1a1b7e03 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/aws" "github.com/grafana/loki/pkg/storage/chunk/azure" "github.com/grafana/loki/pkg/storage/chunk/gcp" + "github.com/grafana/loki/pkg/storage/chunk/hedging" "github.com/grafana/loki/pkg/storage/chunk/openstack" "github.com/grafana/loki/pkg/util" ) @@ -32,6 +33,7 @@ type Storage struct { Azure azure.BlobStorageConfig `yaml:"azure"` Swift openstack.SwiftConfig `yaml:"swift"` FSConfig FilesystemConfig `yaml:"filesystem"` + Hedging hedging.Config `yaml:"hedging"` } func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -40,6 +42,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { s.Azure.RegisterFlagsWithPrefix(prefix+".azure", f) s.Swift.RegisterFlagsWithPrefix(prefix+".swift", f) s.FSConfig.RegisterFlagsWithPrefix(prefix+".filesystem", f) + s.Hedging.RegisterFlagsWithPrefix(prefix, f) } type FilesystemConfig struct { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 9bc4957ab5f20..9fb8049adaf22 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -146,9 +146,9 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { //any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified //derivations, with the derivations taking precedence. func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) { - //Ingester - mergeWithExisting is false when applying the ingester config, and we only want to - //change ingester ring values when applying the common config, so there's no need for the DeepEqual - //check here. + // Ingester - mergeWithExisting is false when applying the ingester config, and we only want to + // change ingester ring values when applying the common config, so there's no need for the DeepEqual + // check here. if mergeWithExisting { r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod @@ -331,7 +331,7 @@ var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in func applyStorageConfig(cfg, defaults *ConfigWrapper) error { var applyConfig func(*ConfigWrapper) - //only one config is allowed + // only one config is allowed configsFound := 0 if !reflect.DeepEqual(cfg.Common.Storage.Azure, defaults.StorageConfig.AzureStorageConfig) { @@ -341,6 +341,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { r.Ruler.StoreConfig.Type = "azure" r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure.ToCortexAzureConfig() r.StorageConfig.AzureStorageConfig = r.Common.Storage.Azure + r.StorageConfig.Hedging = r.Common.Storage.Hedging r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeAzure } } @@ -368,6 +369,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig() r.StorageConfig.GCSConfig = r.Common.Storage.GCS r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeGCS + r.StorageConfig.Hedging = r.Common.Storage.Hedging } } @@ -379,6 +381,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { r.Ruler.StoreConfig.S3 = r.Common.Storage.S3.ToCortexS3Config() r.StorageConfig.AWSStorageConfig.S3Config = r.Common.Storage.S3 r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeS3 + r.StorageConfig.Hedging = r.Common.Storage.Hedging } } @@ -390,6 +393,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift.ToCortexSwiftConfig() r.StorageConfig.Swift = r.Common.Storage.Swift r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeSwift + r.StorageConfig.Hedging = r.Common.Storage.Hedging } } diff --git a/pkg/storage/chunk/aws/fixtures.go b/pkg/storage/chunk/aws/fixtures.go index 36aa4eefe395e..7d0bf783d41ee 100644 --- a/pkg/storage/chunk/aws/fixtures.go +++ b/pkg/storage/chunk/aws/fixtures.go @@ -44,7 +44,8 @@ var Fixtures = []testutils.Fixture{ schemaCfg: schemaConfig, metrics: newMetrics(nil), } - object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil) + mock := newMockS3() + object := objectclient.NewClient(&S3ObjectClient{S3: mock, hedgedS3: mock}, nil) return index, object, table, schemaConfig, testutils.CloserFunc(func() error { table.Stop() index.Stop() diff --git a/pkg/storage/chunk/aws/s3_storage_client.go b/pkg/storage/chunk/aws/s3_storage_client.go index ddd619ac0b782..ef6db2cafd1a4 100644 --- a/pkg/storage/chunk/aws/s3_storage_client.go +++ b/pkg/storage/chunk/aws/s3_storage_client.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/hedging" ) const ( @@ -159,28 +160,27 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig { } type S3ObjectClient struct { - cfg S3Config + cfg S3Config + bucketNames []string S3 s3iface.S3API + hedgedS3 s3iface.S3API sseConfig *SSEParsedConfig } // NewS3ObjectClient makes a new S3-backed ObjectClient. -func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) { - s3Config, bucketNames, err := buildS3Config(cfg) +func NewS3ObjectClient(cfg S3Config, hedgingCfg hedging.Config) (*S3ObjectClient, error) { + bucketNames, err := buckets(cfg) if err != nil { - return nil, errors.Wrap(err, "failed to build s3 config") + return nil, err } - - sess, err := session.NewSession(s3Config) + s3Client, err := buildS3Client(cfg, hedgingCfg, false) if err != nil { - return nil, errors.Wrap(err, "failed to create new s3 session") + return nil, errors.Wrap(err, "failed to build s3 config") } - - s3Client := s3.New(sess) - - if cfg.SignatureVersion == SignatureVersionV2 { - s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg)) + s3ClientHedging, err := buildS3Client(cfg, hedgingCfg, true) + if err != nil { + return nil, errors.Wrap(err, "failed to build s3 config") } sseCfg, err := buildSSEParsedConfig(cfg) @@ -191,6 +191,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) { client := S3ObjectClient{ cfg: cfg, S3: s3Client, + hedgedS3: s3ClientHedging, bucketNames: bucketNames, sseConfig: sseCfg, } @@ -234,7 +235,7 @@ func v2SignRequestHandler(cfg S3Config) request.NamedHandler { } } -func buildS3Config(cfg S3Config) (*aws.Config, []string, error) { +func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S3, error) { var s3Config *aws.Config var err error @@ -242,7 +243,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) { if cfg.S3.URL != nil { s3Config, err = awscommon.ConfigFromURL(cfg.S3.URL) if err != nil { - return nil, nil, err + return nil, err } } else { s3Config = &aws.Config{} @@ -266,7 +267,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) { if cfg.AccessKeyID != "" && cfg.SecretAccessKey == "" || cfg.AccessKeyID == "" && cfg.SecretAccessKey != "" { - return nil, nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither") + return nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither") } if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" { @@ -282,7 +283,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) { tlsConfig.RootCAs = x509.NewCertPool() data, err := os.ReadFile(cfg.HTTPConfig.CAFile) if err != nil { - return nil, nil, err + return nil, err } tlsConfig.RootCAs.AppendCertsFromPEM(data) } @@ -310,11 +311,31 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) { if cfg.Inject != nil { transport = cfg.Inject(transport) } - - s3Config = s3Config.WithHTTPClient(&http.Client{ + httpClient := &http.Client{ Transport: transport, - }) + } + if hedging { + httpClient = hedgingCfg.Client(httpClient) + } + + s3Config = s3Config.WithHTTPClient(httpClient) + + sess, err := session.NewSession(s3Config) + if err != nil { + return nil, errors.Wrap(err, "failed to create new s3 session") + } + + s3Client := s3.New(sess) + + if cfg.SignatureVersion == SignatureVersionV2 { + s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg)) + } + + return s3Client, nil +} + +func buckets(cfg S3Config) ([]string, error) { // bucketnames var bucketNames []string if cfg.S3.URL != nil { @@ -326,10 +347,9 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) { } if len(bucketNames) == 0 { - return nil, nil, errors.New("at least one bucket name must be specified") + return nil, errors.New("at least one bucket name must be specified") } - - return s3Config, bucketNames, nil + return bucketNames, nil } // Stop fulfills the chunk.ObjectClient interface @@ -376,7 +396,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re } err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { var requestErr error - resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{ + resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(objectKey), }) diff --git a/pkg/storage/chunk/aws/s3_storage_client_test.go b/pkg/storage/chunk/aws/s3_storage_client_test.go index 7e7fd89e506d3..ad84691a02615 100644 --- a/pkg/storage/chunk/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/aws/s3_storage_client_test.go @@ -1,16 +1,23 @@ package aws import ( + "bytes" "context" + "errors" "fmt" "io" "net/http" "net/http/httptest" "strings" "testing" + "time" + "github.com/grafana/dskit/backoff" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/storage/chunk/hedging" ) type RoundTripperFunc func(*http.Request) (*http.Response, error) @@ -59,7 +66,7 @@ func TestRequestMiddleware(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg.Inject = tt.fn - client, err := NewS3ObjectClient(cfg) + client, err := NewS3ObjectClient(cfg, hedging.Config{}) require.NoError(t, err) readCloser, err := client.GetObject(context.Background(), "key") @@ -75,3 +82,68 @@ func TestRequestMiddleware(t *testing.T) { }) } } + +func Test_Hedging(t *testing.T) { + for _, tc := range []struct { + name string + expectedCalls int32 + hedgeAt time.Duration + upTo int + do func(c *S3ObjectClient) + }{ + { + "delete/put/list are not hedged", + 3, + 20 * time.Nanosecond, + 10, + func(c *S3ObjectClient) { + _ = c.DeleteObject(context.Background(), "foo") + _, _, _ = c.List(context.Background(), "foo", "/") + _ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar"))) + }, + }, + { + "gets are hedged", + 3, + 20 * time.Nanosecond, + 3, + func(c *S3ObjectClient) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + { + "gets are not hedged when not configured", + 1, + 0, + 0, + func(c *S3ObjectClient) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + count := atomic.NewInt32(0) + + c, err := NewS3ObjectClient(S3Config{ + AccessKeyID: "foo", + SecretAccessKey: "bar", + BackoffConfig: backoff.Config{MaxRetries: 1}, + BucketNames: "foo", + Inject: func(next http.RoundTripper) http.RoundTripper { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return nil, errors.New("foo") + }) + }, + }, hedging.Config{ + At: tc.hedgeAt, + UpTo: tc.upTo, + }) + require.NoError(t, err) + tc.do(c) + require.Equal(t, tc.expectedCalls, count.Load()) + }) + } +} diff --git a/pkg/storage/chunk/azure/blob_storage_client.go b/pkg/storage/chunk/azure/blob_storage_client.go index af4b4e50ab282..e571a4c3c6dd6 100644 --- a/pkg/storage/chunk/azure/blob_storage_client.go +++ b/pkg/storage/chunk/azure/blob_storage_client.go @@ -6,12 +6,15 @@ import ( "flag" "fmt" "io" + "net" + "net/http" "net/url" "strings" "time" "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/mattn/go-ieproxy" cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure" "github.com/cortexproject/cortex/pkg/util" @@ -19,6 +22,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/hedging" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" ) @@ -51,6 +55,26 @@ var ( "https://%s.blob.core.usgovcloudapi.net/%s", }, } + + // default Azure http client. + defaultClient = &http.Client{ + Transport: &http.Transport{ + Proxy: ieproxy.GetProxyFunc(), + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).Dial, + MaxIdleConns: 0, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DisableKeepAlives: false, + DisableCompression: false, + MaxResponseHeaderBytes: 0, + }, + } ) // BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage. @@ -109,14 +133,16 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig type BlobStorage struct { // blobService storage.Serv cfg *BlobStorageConfig + hedgingCfg hedging.Config containerURL azblob.ContainerURL } // NewBlobStorage creates a new instance of the BlobStorage struct. -func NewBlobStorage(cfg *BlobStorageConfig) (*BlobStorage, error) { +func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) { log.WarnExperimentalUse("Azure Blob Storage") blobStorage := &BlobStorage{ - cfg: cfg, + cfg: cfg, + hedgingCfg: hedgingCfg, } var err error @@ -148,7 +174,7 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC } func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) { - blockBlobURL, err := b.getBlobURL(objectKey) + blockBlobURL, err := b.getBlobURL(objectKey, true) if err != nil { return nil, err } @@ -163,7 +189,7 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re } func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { - blockBlobURL, err := b.getBlobURL(objectKey) + blockBlobURL, err := b.getBlobURL(objectKey, false) if err != nil { return err } @@ -176,7 +202,7 @@ func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io return err } -func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) { +func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobURL, error) { blobID = strings.Replace(blobID, ":", "-", -1) // generate url for new chunk blob @@ -185,7 +211,7 @@ func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) { return azblob.BlockBlobURL{}, err } - azPipeline, err := b.newPipeline() + azPipeline, err := b.newPipeline(hedging) if err != nil { return azblob.BlockBlobURL{}, err } @@ -199,7 +225,7 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { return azblob.ContainerURL{}, err } - azPipeline, err := b.newPipeline() + azPipeline, err := b.newPipeline(false) if err != nil { return azblob.ContainerURL{}, err } @@ -207,13 +233,13 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { return azblob.NewContainerURL(*u, azPipeline), nil } -func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) { +func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) { credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value) if err != nil { return nil, err } - return azblob.NewPipeline(credential, azblob.PipelineOptions{ + opts := azblob.PipelineOptions{ Retry: azblob.RetryOptions{ Policy: azblob.RetryPolicyExponential, MaxTries: (int32)(b.cfg.MaxRetries), @@ -221,7 +247,26 @@ func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) { RetryDelay: b.cfg.MinRetryDelay, MaxRetryDelay: b.cfg.MaxRetryDelay, }, - }), nil + } + + opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { + return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { + resp, err := defaultClient.Do(request.WithContext(ctx)) + return pipeline.NewHTTPResponse(resp), err + } + }) + + if hedging { + opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { + client := b.hedgingCfg.Client(defaultClient) + return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { + resp, err := client.Do(request.WithContext(ctx)) + return pipeline.NewHTTPResponse(resp), err + } + }) + } + + return azblob.NewPipeline(credential, opts), nil } // List implements chunk.ObjectClient. @@ -259,7 +304,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chu } func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error { - blockBlobURL, err := b.getBlobURL(blobID) + blockBlobURL, err := b.getBlobURL(blobID, false) if err != nil { return err } diff --git a/pkg/storage/chunk/azure/blob_storage_client_test.go b/pkg/storage/chunk/azure/blob_storage_client_test.go new file mode 100644 index 0000000000000..491ce0b3eb31e --- /dev/null +++ b/pkg/storage/chunk/azure/blob_storage_client_test.go @@ -0,0 +1,85 @@ +package azure + +import ( + "bytes" + "context" + "errors" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/storage/chunk/hedging" +) + +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + +func Test_Hedging(t *testing.T) { + for _, tc := range []struct { + name string + expectedCalls int32 + hedgeAt time.Duration + upTo int + do func(c *BlobStorage) + }{ + { + "delete/put/list are not hedged", + 3, + 20 * time.Nanosecond, + 10, + func(c *BlobStorage) { + _ = c.DeleteObject(context.Background(), "foo") + _, _, _ = c.List(context.Background(), "foo", "/") + _ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar"))) + }, + }, + { + "gets are hedged", + 3, + 20 * time.Nanosecond, + 3, + func(c *BlobStorage) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + { + "gets are not hedged when not configured", + 1, + 0, + 0, + func(c *BlobStorage) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + count := atomic.NewInt32(0) + // hijack the client to count the number of calls + defaultClient = &http.Client{ + Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return nil, errors.New("fo") + }), + } + c, err := NewBlobStorage(&BlobStorageConfig{ + ContainerName: "foo", + Environment: azureGlobal, + MaxRetries: 1, + }, hedging.Config{ + At: tc.hedgeAt, + UpTo: tc.upTo, + }) + require.NoError(t, err) + tc.do(c) + require.Equal(t, tc.expectedCalls, count.Load()) + }) + } +} diff --git a/pkg/storage/chunk/gcp/fixtures.go b/pkg/storage/chunk/gcp/fixtures.go index 9b525490c386c..088d118554150 100644 --- a/pkg/storage/chunk/gcp/fixtures.go +++ b/pkg/storage/chunk/gcp/fixtures.go @@ -7,11 +7,13 @@ import ( "cloud.google.com/go/bigtable" "cloud.google.com/go/bigtable/bttest" + "cloud.google.com/go/storage" "github.com/fsouza/fake-gcs-server/fakestorage" "google.golang.org/api/option" "google.golang.org/grpc" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/hedging" "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/testutils" ) @@ -78,7 +80,14 @@ func (f *fixture) Clients() ( } if f.gcsObjectClient { - cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()), nil) + var c *GCSObjectClient + c, err = newGCSObjectClient(ctx, GCSConfig{BucketName: "chunks"}, hedging.Config{}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) { + return f.gcssrv.Client(), nil + }) + if err != nil { + return + } + cClient = objectclient.NewClient(c, nil) } else { cClient = newBigtableObjectClient(Config{}, schemaConfig, client) } diff --git a/pkg/storage/chunk/gcp/gcs_object_client.go b/pkg/storage/chunk/gcp/gcs_object_client.go index 9878cbe1b9e5e..416fe243a7f45 100644 --- a/pkg/storage/chunk/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/gcp/gcs_object_client.go @@ -13,13 +13,17 @@ import ( "google.golang.org/api/option" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/hedging" "github.com/grafana/loki/pkg/storage/chunk/util" ) +type ClientFactory func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) + type GCSObjectClient struct { - cfg GCSConfig - client *storage.Client - bucket *storage.BucketHandle + cfg GCSConfig + + bucket *storage.BucketHandle + hedgingBucket *storage.BucketHandle } // GCSConfig is config for the GCS Chunk Client. @@ -28,6 +32,8 @@ type GCSConfig struct { ChunkBufferSize int `yaml:"chunk_buffer_size"` RequestTimeout time.Duration `yaml:"request_timeout"` EnableOpenCensus bool `yaml:"enable_opencensus"` + + Insecure bool `yaml:"-"` } // RegisterFlags registers flags. @@ -53,35 +59,51 @@ func (cfg *GCSConfig) ToCortexGCSConfig() cortex_gcp.GCSConfig { } // NewGCSObjectClient makes a new chunk.Client that writes chunks to GCS. -func NewGCSObjectClient(ctx context.Context, cfg GCSConfig) (*GCSObjectClient, error) { +func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config) (*GCSObjectClient, error) { + return newGCSObjectClient(ctx, cfg, hedgingCfg, storage.NewClient) +} + +func newGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config, clientFactory ClientFactory) (*GCSObjectClient, error) { + bucket, err := newBucketHandle(ctx, cfg, hedgingCfg, false, clientFactory) + if err != nil { + return nil, err + } + hedgingBucket, err := newBucketHandle(ctx, cfg, hedgingCfg, true, clientFactory) + if err != nil { + return nil, err + } + return &GCSObjectClient{ + cfg: cfg, + bucket: bucket, + hedgingBucket: hedgingBucket, + }, nil +} + +func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config, hedging bool, clientFactory ClientFactory) (*storage.BucketHandle, error) { var opts []option.ClientOption - instrumentation, err := gcsInstrumentation(ctx, storage.ScopeReadWrite) + httpClient, err := gcsInstrumentation(ctx, storage.ScopeReadWrite, cfg.Insecure) if err != nil { return nil, err } - opts = append(opts, instrumentation) + + if hedging { + httpClient = hedgingCfg.Client(httpClient) + } + + opts = append(opts, option.WithHTTPClient(httpClient)) if !cfg.EnableOpenCensus { opts = append(opts, option.WithTelemetryDisabled()) } - client, err := storage.NewClient(ctx, opts...) + client, err := clientFactory(ctx, opts...) if err != nil { return nil, err } - return newGCSObjectClient(cfg, client), nil -} -func newGCSObjectClient(cfg GCSConfig, client *storage.Client) *GCSObjectClient { - bucket := client.Bucket(cfg.BucketName) - return &GCSObjectClient{ - cfg: cfg, - client: client, - bucket: bucket, - } + return client.Bucket(cfg.BucketName), nil } func (s *GCSObjectClient) Stop() { - s.client.Close() } // GetObject returns a reader for the specified object key from the configured GCS bucket. @@ -102,7 +124,7 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R } func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) { - reader, err := s.bucket.Object(objectKey).NewReader(ctx) + reader, err := s.hedgingBucket.Object(objectKey).NewReader(ctx) if err != nil { return nil, err } diff --git a/pkg/storage/chunk/gcp/gcs_object_client_test.go b/pkg/storage/chunk/gcp/gcs_object_client_test.go new file mode 100644 index 0000000000000..68104aa3d656e --- /dev/null +++ b/pkg/storage/chunk/gcp/gcs_object_client_test.go @@ -0,0 +1,90 @@ +package gcp + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "cloud.google.com/go/storage" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "google.golang.org/api/option" + + "github.com/grafana/loki/pkg/storage/chunk/hedging" +) + +func Test_Hedging(t *testing.T) { + for _, tc := range []struct { + name string + expectedCalls int32 + hedgeAt time.Duration + upTo int + do func(c *GCSObjectClient) + }{ + { + "delete/put/list are not hedged", + 3, + 20 * time.Nanosecond, + 10, + func(c *GCSObjectClient) { + _ = c.DeleteObject(context.Background(), "foo") + _, _, _ = c.List(context.Background(), "foo", "/") + _ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar"))) + }, + }, + { + "gets are hedged", + 3, + 20 * time.Nanosecond, + 3, + func(c *GCSObjectClient) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + { + "gets are not hedged when not configured", + 1, + 0, + 0, + func(c *GCSObjectClient) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + count := atomic.NewInt32(0) + server := fakeServer(t, 200*time.Millisecond, count) + ctx := context.Background() + c, err := newGCSObjectClient(ctx, GCSConfig{ + BucketName: "test-bucket", + Insecure: true, + }, hedging.Config{ + At: tc.hedgeAt, + UpTo: tc.upTo, + }, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) { + opts = append(opts, option.WithEndpoint(server.URL)) + opts = append(opts, option.WithoutAuthentication()) + return storage.NewClient(ctx, opts...) + }) + require.NoError(t, err) + tc.do(c) + require.Equal(t, tc.expectedCalls, count.Load()) + }) + } +} + +func fakeServer(t *testing.T, returnIn time.Duration, counter *atomic.Int32) *httptest.Server { + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + counter.Inc() + time.Sleep(returnIn) + _, _ = w.Write([]byte(`{}`)) + })) + server.StartTLS() + t.Cleanup(server.Close) + + return server +} diff --git a/pkg/storage/chunk/gcp/instrumentation.go b/pkg/storage/chunk/gcp/instrumentation.go index 7f12fa836391f..b661d2c6c3143 100644 --- a/pkg/storage/chunk/gcp/instrumentation.go +++ b/pkg/storage/chunk/gcp/instrumentation.go @@ -2,6 +2,7 @@ package gcp import ( "context" + "crypto/tls" "net/http" "strconv" "time" @@ -49,8 +50,13 @@ func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClie } } -func gcsInstrumentation(ctx context.Context, scope string) (option.ClientOption, error) { - transport, err := google_http.NewTransport(ctx, http.DefaultTransport, option.WithScopes(scope)) +func gcsInstrumentation(ctx context.Context, scope string, insecure bool) (*http.Client, error) { + // start with default transport + customTransport := http.DefaultTransport.(*http.Transport).Clone() + if insecure { + customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + transport, err := google_http.NewTransport(ctx, customTransport, option.WithScopes(scope)) if err != nil { return nil, err } @@ -60,7 +66,7 @@ func gcsInstrumentation(ctx context.Context, scope string) (option.ClientOption, next: transport, }, } - return option.WithHTTPClient(client), nil + return client, nil } func toOptions(opts []grpc.DialOption) []option.ClientOption { diff --git a/pkg/storage/chunk/hedging/hedging.go b/pkg/storage/chunk/hedging/hedging.go new file mode 100644 index 0000000000000..8ba8bdcb37823 --- /dev/null +++ b/pkg/storage/chunk/hedging/hedging.go @@ -0,0 +1,42 @@ +package hedging + +import ( + "flag" + "net/http" + "time" + + "github.com/cristalhq/hedgedhttp" +) + +// Config is the configuration for hedging requests. +type Config struct { + // At is the duration after which a second request will be issued. + At time.Duration `yaml:"at"` + // UpTo is the maximum number of requests that will be issued. + UpTo int `yaml:"up_to"` +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix registers flags with prefix. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.") + f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)") +} + +func (cfg *Config) Client(client *http.Client) *http.Client { + if cfg.At == 0 { + return client + } + return hedgedhttp.NewClient(cfg.At, cfg.UpTo, client) +} + +func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper { + if cfg.At == 0 { + return next + } + return hedgedhttp.NewRoundTripper(cfg.At, cfg.UpTo, next) +} diff --git a/pkg/storage/chunk/openstack/swift_object_client.go b/pkg/storage/chunk/openstack/swift_object_client.go index e9a71d5e57ae6..4226e78f95e08 100644 --- a/pkg/storage/chunk/openstack/swift_object_client.go +++ b/pkg/storage/chunk/openstack/swift_object_client.go @@ -7,6 +7,8 @@ import ( "fmt" "io" "io/ioutil" + "net/http" + "time" "github.com/ncw/swift" "github.com/pkg/errors" @@ -16,11 +18,19 @@ import ( "github.com/cortexproject/cortex/pkg/util/log" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/hedging" ) +var defaultTransport http.RoundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxIdleConnsPerHost: 512, + ExpectContinueTimeout: 5 * time.Second, +} + type SwiftObjectClient struct { - conn *swift.Connection - cfg SwiftConfig + conn *swift.Connection + hedgingConn *swift.Connection + cfg SwiftConfig } // SwiftConfig is config for the Swift Chunk Client. @@ -50,9 +60,29 @@ func (cfg *SwiftConfig) ToCortexSwiftConfig() cortex_openstack.SwiftConfig { } // NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift. -func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) { +func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObjectClient, error) { log.WarnExperimentalUse("OpenStack Swift Storage") + c, err := createConnection(cfg, hedgingCfg, false) + if err != nil { + return nil, err + } + // Ensure the container is created, no error is returned if it already exists. + if err := c.ContainerCreate(cfg.ContainerName, nil); err != nil { + return nil, err + } + hedging, err := createConnection(cfg, hedgingCfg, true) + if err != nil { + return nil, err + } + return &SwiftObjectClient{ + conn: c, + hedgingConn: hedging, + cfg: cfg, + }, nil +} + +func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) { // Create a connection c := &swift.Connection{ AuthVersion: cfg.AuthVersion, @@ -70,6 +100,7 @@ func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) { Domain: cfg.DomainName, DomainId: cfg.DomainID, Region: cfg.RegionName, + Transport: defaultTransport, } switch { @@ -78,32 +109,27 @@ func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) { case cfg.UserDomainID != "": c.DomainId = cfg.UserDomainID } + if hedging { + c.Transport = hedgingCfg.RoundTripper(c.Transport) + } - // Authenticate err := c.Authenticate() if err != nil { return nil, err } - // Ensure the container is created, no error is returned if it already exists. - if err := c.ContainerCreate(cfg.ContainerName, nil); err != nil { - return nil, err - } - - return &SwiftObjectClient{ - conn: c, - cfg: cfg, - }, nil + return c, nil } func (s *SwiftObjectClient) Stop() { s.conn.UnAuthenticate() + s.hedgingConn.UnAuthenticate() } // GetObject returns a reader for the specified object key from the configured swift container. func (s *SwiftObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { var buf bytes.Buffer - _, err := s.conn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, nil) + _, err := s.hedgingConn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, nil) if err != nil { return nil, err } diff --git a/pkg/storage/chunk/openstack/swift_object_client_test.go b/pkg/storage/chunk/openstack/swift_object_client_test.go new file mode 100644 index 0000000000000..ca2c5d7155055 --- /dev/null +++ b/pkg/storage/chunk/openstack/swift_object_client_test.go @@ -0,0 +1,110 @@ +package openstack + +import ( + "bytes" + "context" + "net/http" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/storage/bucket/swift" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/storage/chunk/hedging" +) + +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + +func Test_Hedging(t *testing.T) { + for _, tc := range []struct { + name string + expectedCalls int32 + hedgeAt time.Duration + upTo int + do func(c *SwiftObjectClient) + }{ + { + "delete/put/list are not hedged", + 3, + 20 * time.Nanosecond, + 10, + func(c *SwiftObjectClient) { + _ = c.DeleteObject(context.Background(), "foo") + _, _, _ = c.List(context.Background(), "foo", "/") + _ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar"))) + }, + }, + { + "gets are hedged", + 3, + 20 * time.Nanosecond, + 3, + func(c *SwiftObjectClient) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + { + "gets are not hedged when not configured", + 1, + 0, + 0, + func(c *SwiftObjectClient) { + _, _ = c.GetObject(context.Background(), "foo") + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + count := atomic.NewInt32(0) + // hijack the transport to count the number of calls + defaultTransport = RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + // fake auth + if req.Header.Get("X-Auth-Key") == "passwd" { + return &http.Response{ + StatusCode: http.StatusOK, + Body: http.NoBody, + Header: http.Header{ + "X-Storage-Url": []string{"http://swift.example.com/v1/AUTH_test"}, + "X-Auth-Token": []string{"token"}, + }, + }, nil + } + // fake container creation + if req.Method == "PUT" && req.URL.Path == "/v1/AUTH_test/foo" { + return &http.Response{ + StatusCode: http.StatusCreated, + Body: http.NoBody, + }, nil + } + count.Inc() + time.Sleep(200 * time.Millisecond) + return &http.Response{ + StatusCode: http.StatusOK, + Body: http.NoBody, + }, nil + }) + + c, err := NewSwiftObjectClient(SwiftConfig{ + Config: swift.Config{ + MaxRetries: 1, + ContainerName: "foo", + AuthVersion: 1, + Password: "passwd", + ConnectTimeout: 10 * time.Second, + RequestTimeout: 10 * time.Second, + }, + }, hedging.Config{ + At: tc.hedgeAt, + UpTo: tc.upTo, + }) + require.NoError(t, err) + tc.do(c) + require.Equal(t, tc.expectedCalls, count.Load()) + }) + } +} diff --git a/pkg/storage/chunk/storage/factory.go b/pkg/storage/chunk/storage/factory.go index d9c5606de2698..ec95d113d920d 100644 --- a/pkg/storage/chunk/storage/factory.go +++ b/pkg/storage/chunk/storage/factory.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/cassandra" "github.com/grafana/loki/pkg/storage/chunk/gcp" "github.com/grafana/loki/pkg/storage/chunk/grpc" + "github.com/grafana/loki/pkg/storage/chunk/hedging" "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/openstack" @@ -95,6 +96,8 @@ type Config struct { DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` GrpcConfig grpc.Config `yaml:"grpc_store"` + + Hedging hedging.Config `yaml:"hedging"` } // RegisterFlags adds the flags required to configure this flag set. @@ -108,6 +111,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.FSConfig.RegisterFlags(f) cfg.Swift.RegisterFlags(f) cfg.GrpcConfig.RegisterFlags(f) + cfg.Hedging.RegisterFlagsWithPrefix("store.", f) f.StringVar(&cfg.Engine, "store.engine", "chunks", "The storage engine to use: chunks or blocks.") cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading.", f) @@ -267,7 +271,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis case StorageTypeInMemory: return chunk.NewMockStorage(), nil case StorageTypeAWS, StorageTypeS3: - return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config)) + return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging)) case StorageTypeAWSDynamo: if cfg.AWSStorageConfig.DynamoDB.URL == nil { return nil, fmt.Errorf("Must set -dynamodb.url in aws mode") @@ -278,15 +282,15 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis } return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer) case StorageTypeAzure: - return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig)) + return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging)) case StorageTypeGCP: return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case StorageTypeGCPColumnKey, StorageTypeBigTable, StorageTypeBigTableHashed: return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case StorageTypeGCS: - return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig)) + return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging)) case StorageTypeSwift: - return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift)) + return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging)) case StorageTypeCassandra: return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer) case StorageTypeFileSystem: @@ -355,13 +359,13 @@ func NewBucketClient(storageConfig Config) (chunk.BucketClient, error) { func NewObjectClient(name string, cfg Config) (chunk.ObjectClient, error) { switch name { case StorageTypeAWS, StorageTypeS3: - return aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config) + return aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging) case StorageTypeGCS: - return gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig) + return gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging) case StorageTypeAzure: - return azure.NewBlobStorage(&cfg.AzureStorageConfig) + return azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging) case StorageTypeSwift: - return openstack.NewSwiftObjectClient(cfg.Swift) + return openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging) case StorageTypeInMemory: return chunk.NewMockStorage(), nil case StorageTypeFileSystem: diff --git a/vendor/github.com/cristalhq/hedgedhttp/LICENSE b/vendor/github.com/cristalhq/hedgedhttp/LICENSE new file mode 100644 index 0000000000000..7e7c9798a4289 --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 cristaltech + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/cristalhq/hedgedhttp/README.md b/vendor/github.com/cristalhq/hedgedhttp/README.md new file mode 100644 index 0000000000000..2b6aa491b1814 --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/README.md @@ -0,0 +1,53 @@ +# hedgedhttp + +[![build-img]][build-url] +[![pkg-img]][pkg-url] +[![reportcard-img]][reportcard-url] +[![coverage-img]][coverage-url] + +Hedged HTTP client which helps to reduce tail latency at scale. + +## Rationale + +See paper [Tail at Scale](https://cacm.acm.org/magazines/2013/2/160173-the-tail-at-scale/fulltext) by Jeffrey Dean, Luiz André Barroso. In short: the client first sends one request, but then sends an additional request after a timeout if the previous hasn't returned an answer in the expected time. The client cancels remaining requests once the first result is received. + +## Acknowledge + +Thanks to [Bohdan Storozhuk](https://github.com/storozhukbm) for the review and powerful hints. + +## Features + +* Simple API. +* Easy to integrate. +* Optimized for speed. +* Clean and tested code. +* Dependency-free. + +## Install + +Go version 1.16+ + +``` +go get github.com/cristalhq/hedgedhttp +``` + +## Example + +TODO + +## Documentation + +See [these docs][pkg-url]. + +## License + +[MIT License](LICENSE). + +[build-img]: https://github.com/cristalhq/hedgedhttp/workflows/build/badge.svg +[build-url]: https://github.com/cristalhq/hedgedhttp/actions +[pkg-img]: https://pkg.go.dev/badge/cristalhq/hedgedhttp +[pkg-url]: https://pkg.go.dev/github.com/cristalhq/hedgedhttp +[reportcard-img]: https://goreportcard.com/badge/cristalhq/hedgedhttp +[reportcard-url]: https://goreportcard.com/report/cristalhq/hedgedhttp +[coverage-img]: https://codecov.io/gh/cristalhq/hedgedhttp/branch/main/graph/badge.svg +[coverage-url]: https://codecov.io/gh/cristalhq/hedgedhttp diff --git a/vendor/github.com/cristalhq/hedgedhttp/hedged.go b/vendor/github.com/cristalhq/hedgedhttp/hedged.go new file mode 100644 index 0000000000000..f3a3f041c563e --- /dev/null +++ b/vendor/github.com/cristalhq/hedgedhttp/hedged.go @@ -0,0 +1,345 @@ +package hedgedhttp + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" +) + +const infiniteTimeout = 30 * 24 * time.Hour // domain specific infinite + +// NewClient returns a new http.Client which implements hedged requests pattern. +// Given Client starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewClient(timeout time.Duration, upto int, client *http.Client) *http.Client { + newClient, _ := NewClientAndStats(timeout, upto, client) + return newClient +} + +// NewClientAndStats returns a new http.Client which implements hedged requests pattern +// And Stats object that can be queried to obtain client's metrics. +// Given Client starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewClientAndStats(timeout time.Duration, upto int, client *http.Client) (*http.Client, *Stats) { + if client == nil { + client = &http.Client{ + Timeout: 5 * time.Second, + } + } + + newTransport, metrics := NewRoundTripperAndStats(timeout, upto, client.Transport) + + client.Transport = newTransport + + return client, metrics +} + +// NewRoundTripper returns a new http.RoundTripper which implements hedged requests pattern. +// Given RoundTripper starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewRoundTripper(timeout time.Duration, upto int, rt http.RoundTripper) http.RoundTripper { + newRT, _ := NewRoundTripperAndStats(timeout, upto, rt) + return newRT +} + +// NewRoundTripperAndStats returns a new http.RoundTripper which implements hedged requests pattern +// And Stats object that can be queried to obtain client's metrics. +// Given RoundTripper starts a new request after a timeout from previous request. +// Starts no more than upto requests. +func NewRoundTripperAndStats(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, *Stats) { + switch { + case timeout < 0: + panic("hedgedhttp: timeout cannot be negative") + case upto < 1: + panic("hedgedhttp: upto must be greater than 0") + } + + if rt == nil { + rt = http.DefaultTransport + } + + if timeout == 0 { + timeout = time.Nanosecond // smallest possible timeout if not set + } + + hedged := &hedgedTransport{ + rt: rt, + timeout: timeout, + upto: upto, + metrics: &Stats{}, + } + return hedged, hedged.metrics +} + +type hedgedTransport struct { + rt http.RoundTripper + timeout time.Duration + upto int + metrics *Stats +} + +func (ht *hedgedTransport) RoundTrip(req *http.Request) (*http.Response, error) { + mainCtx := req.Context() + + timeout := ht.timeout + errOverall := &MultiError{} + resultCh := make(chan indexedResp, ht.upto) + errorCh := make(chan error, ht.upto) + + ht.metrics.requestedRoundTripsInc() + + resultIdx := -1 + cancels := make([]func(), ht.upto) + + defer runInPool(func() { + for i, cancel := range cancels { + if i != resultIdx && cancel != nil { + ht.metrics.canceledSubRequestsInc() + cancel() + } + } + }) + + for sent := 0; len(errOverall.Errors) < ht.upto; sent++ { + if sent < ht.upto { + idx := sent + subReq, cancel := reqWithCtx(req, mainCtx) + cancels[idx] = cancel + + runInPool(func() { + ht.metrics.actualRoundTripsInc() + resp, err := ht.rt.RoundTrip(subReq) + if err != nil { + ht.metrics.failedRoundTripsInc() + errorCh <- err + } else { + resultCh <- indexedResp{idx, resp} + } + }) + } + + // all request sent - effectively disabling timeout between requests + if sent == ht.upto { + timeout = infiniteTimeout + } + resp, err := waitResult(mainCtx, resultCh, errorCh, timeout) + + switch { + case resp.Resp != nil: + resultIdx = resp.Index + return resp.Resp, nil + case mainCtx.Err() != nil: + ht.metrics.canceledByUserRoundTripsInc() + return nil, mainCtx.Err() + case err != nil: + errOverall.Errors = append(errOverall.Errors, err) + } + } + + // all request have returned errors + return nil, errOverall +} + +func waitResult(ctx context.Context, resultCh <-chan indexedResp, errorCh <-chan error, timeout time.Duration) (indexedResp, error) { + // try to read result first before blocking on all other channels + select { + case res := <-resultCh: + return res, nil + default: + timer := getTimer(timeout) + defer returnTimer(timer) + + select { + case res := <-resultCh: + return res, nil + + case reqErr := <-errorCh: + return indexedResp{}, reqErr + + case <-ctx.Done(): + return indexedResp{}, ctx.Err() + + case <-timer.C: + return indexedResp{}, nil // it's not a request timeout, it's timeout BETWEEN consecutive requests + } + } +} + +type indexedResp struct { + Index int + Resp *http.Response +} + +func reqWithCtx(r *http.Request, ctx context.Context) (*http.Request, func()) { + ctx, cancel := context.WithCancel(ctx) + req := r.WithContext(ctx) + return req, cancel +} + +// atomicCounter is a false sharing safe counter. +type atomicCounter struct { + count uint64 + _ [7]uint64 +} + +type cacheLine [64]byte + +// Stats object that can be queried to obtain certain metrics and get better observability. +type Stats struct { + _ cacheLine + requestedRoundTrips atomicCounter + actualRoundTrips atomicCounter + failedRoundTrips atomicCounter + canceledByUserRoundTrips atomicCounter + canceledSubRequests atomicCounter + _ cacheLine +} + +func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) } +func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) } +func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) } +func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) } +func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) } + +// RequestedRoundTrips returns count of requests that were requested by client. +func (s *Stats) RequestedRoundTrips() uint64 { + return atomic.LoadUint64(&s.requestedRoundTrips.count) +} + +// ActualRoundTrips returns count of requests that were actually sent. +func (s *Stats) ActualRoundTrips() uint64 { + return atomic.LoadUint64(&s.actualRoundTrips.count) +} + +// FailedRoundTrips returns count of requests that failed. +func (s *Stats) FailedRoundTrips() uint64 { + return atomic.LoadUint64(&s.failedRoundTrips.count) +} + +// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context. +func (s *Stats) CanceledByUserRoundTrips() uint64 { + return atomic.LoadUint64(&s.canceledByUserRoundTrips.count) +} + +// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport. +func (s *Stats) CanceledSubRequests() uint64 { + return atomic.LoadUint64(&s.canceledSubRequests.count) +} + +// StatsSnapshot is a snapshot of Stats. +type StatsSnapshot struct { + RequestedRoundTrips uint64 // count of requests that were requested by client + ActualRoundTrips uint64 // count of requests that were actually sent + FailedRoundTrips uint64 // count of requests that failed + CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context + CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport +} + +// Snapshot of the stats. +func (s *Stats) Snapshot() StatsSnapshot { + return StatsSnapshot{ + RequestedRoundTrips: s.RequestedRoundTrips(), + ActualRoundTrips: s.ActualRoundTrips(), + FailedRoundTrips: s.FailedRoundTrips(), + CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(), + CanceledSubRequests: s.CanceledSubRequests(), + } +} + +var taskQueue = make(chan func()) + +func runInPool(task func()) { + select { + case taskQueue <- task: + // submited, everything is ok + + default: + go func() { + // do the given task + task() + + const cleanupDuration = 10 * time.Second + cleanupTicker := time.NewTicker(cleanupDuration) + defer cleanupTicker.Stop() + + for { + select { + case t := <-taskQueue: + t() + cleanupTicker.Reset(cleanupDuration) + case <-cleanupTicker.C: + return + } + } + }() + } +} + +// MultiError is an error type to track multiple errors. This is used to +// accumulate errors in cases and return them as a single "error". +// Insiper by https://github.com/hashicorp/go-multierror +type MultiError struct { + Errors []error + ErrorFormatFn ErrorFormatFunc +} + +func (e *MultiError) Error() string { + fn := e.ErrorFormatFn + if fn == nil { + fn = listFormatFunc + } + return fn(e.Errors) +} + +func (e *MultiError) String() string { + return fmt.Sprintf("*%#v", e.Errors) +} + +// ErrorOrNil returns an error if there are some. +func (e *MultiError) ErrorOrNil() error { + switch { + case e == nil || len(e.Errors) == 0: + return nil + default: + return e + } +} + +// ErrorFormatFunc is called by MultiError to return the list of errors as a string. +type ErrorFormatFunc func([]error) string + +func listFormatFunc(es []error) string { + if len(es) == 1 { + return fmt.Sprintf("1 error occurred:\n\t* %s\n\n", es[0]) + } + + points := make([]string, len(es)) + for i, err := range es { + points[i] = fmt.Sprintf("* %s", err) + } + + return fmt.Sprintf("%d errors occurred:\n\t%s\n\n", len(es), strings.Join(points, "\n\t")) +} + +var timerPool = sync.Pool{New: func() interface{} { + return time.NewTimer(time.Second) +}} + +func getTimer(duration time.Duration) *time.Timer { + timer := timerPool.Get().(*time.Timer) + timer.Reset(duration) + return timer +} + +func returnTimer(timer *time.Timer) { + timer.Stop() + select { + case _ = <-timer.C: + default: + } + timerPool.Put(timer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4cf12851e9140..30a982c437701 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -308,6 +308,9 @@ github.com/cortexproject/cortex/pkg/util/spanlogger github.com/cortexproject/cortex/pkg/util/test github.com/cortexproject/cortex/pkg/util/validation github.com/cortexproject/cortex/tools/querytee +# github.com/cristalhq/hedgedhttp v0.6.1 +## explicit; go 1.16 +github.com/cristalhq/hedgedhttp # github.com/davecgh/go-spew v1.1.1 ## explicit github.com/davecgh/go-spew/spew