Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the ability to hedge storage requests. #4826

Merged
merged 14 commits into from
Nov 30, 2021
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## Main

* [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests.
* [4828](https://github.com/grafana/loki/pull/4282) **chaudum**: Set correct `Content-Type` header in query response
* [4832](https://github.com/grafana/loki/pull/4832) **taisho6339**: Use http prefix path correctly in Promtail
* [4828](https://github.com/grafana/loki/pull/4828) **chaudum**: Set correct `Content-Type` header in query response
* [4794](https://github.com/grafana/loki/pull/4794) **taisho6339**: Aggregate inotify watcher to file target manager
* [4663](https://github.com/grafana/loki/pull/4663) **taisho6339**: Add SASL&mTLS authentication support for Kafka in Promtail
* [4745](https://github.com/grafana/loki/pull/4745) **taisho6339**: Expose Kafka message key in labels
Expand Down
44 changes: 35 additions & 9 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ The `query_scheduler` block configures the Loki query scheduler.
# This configures the gRPC client used to report errors back to the
# query-frontend.
[grpc_client_config: <grpc_client_config>]

# Set to true to have the query schedulers create and place themselves in a ring.
# If no frontend_address or scheduler_address are present
# If no frontend_address or scheduler_address are present
# anywhere else in the configuration, Loki will toggle this value to true.
[use_scheduler_ring: <boolean> | default = false]

Expand Down Expand Up @@ -453,7 +453,7 @@ storage:
# Method to use for backend rule storage (azure, gcs, s3, swift, local).
# CLI flag: -ruler.storage.type
[type: <string> ]

# Configures backend rule storage for Azure.
[azure: <azure_storage_config>]

Expand All @@ -469,6 +469,9 @@ storage:
# Configures backend rule storage for a local filesystem directory.
[local: <local_storage_config>]

# The `hedging_config` configures how to hedge requests for the storage.
[hedging: <hedging_config>]

# Remote-write configuration to send rule samples to a Prometheus remote-write endpoint.
remote_write:
# Enable remote-write functionality.
Expand Down Expand Up @@ -509,8 +512,8 @@ remote_write:
# List of remote write relabel configurations.
write_relabel_configs:
[- <relabel_config> ...]
# Name of the remote write config, which if specified must be unique among remote

# Name of the remote write config, which if specified must be unique among remote
# write configs.
# The name will be used in metrics and logging in place of a generated value
# to help users distinguish between remote write configs.
Expand Down Expand Up @@ -853,6 +856,26 @@ The `swift_storage_config` configures Swift as a general storage for different d
[container_name: <string> | default = "cortex"]
```

## hedging_config

The `hedging_config` configures how to hedge requests for the storage.

Hedged requests is sending a secondary request until the first request has been outstanding for more than a configure expected latency
for this class of requests.
You should configure the latency based on your p99 of object store requests.

```yaml
# Optional. Default is 0 (disabled)
# Example: "at: 500ms"
# If set to a non-zero value another request will be issued at the provided duration. Recommended to
# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
[up_to: <int> | default = 2]
```

## local_storage_config

The `local_storage_config` configures a (local) filesystem as a general storage for different data generated by Loki.
Expand Down Expand Up @@ -1323,11 +1346,11 @@ aws:
# Minimum duration to back off.
# CLI flag: -s3.backoff-min-period
[min_period: <duration> | default = 100ms]

# The duration to back off.
# CLI flag: -s3.backoff-max-period
[max_period: <duration> | default = 3s]

# Number of times to back off and retry before failing.
# CLI flag: -s3.backoff-retries
[max_retries: <int> | default = 5]
Expand Down Expand Up @@ -2081,7 +2104,7 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# up until lookback duration ago.
# This limit is enforced in the query frontend, the querier and the ruler.
# If the requested time range is outside the allowed range, the request will not fail,
# but will be modified to only query data within the allowed time range.
# but will be modified to only query data within the allowed time range.
# The default value of 0 does not set a limit.
# CLI flag: -querier.max-query-lookback
[max_query_lookback: <duration> | default = 0]
Expand Down Expand Up @@ -2360,6 +2383,9 @@ If any specific configuration for an object storage client have been provided el

# Configures a (local) filesystem as the common storage.
[filesystem: <local_storage_config>]

# The `hedging_config` configures how to hedge requests for the storage.
[hedging: <hedging_config>]
```

### ring_config
Expand Down Expand Up @@ -2512,7 +2538,7 @@ How far into the past accepted out-of-order log entries may be
is configurable with `max_chunk_age`.
`max_chunk_age` defaults to 1 hour.
Loki calculates the earliest time that out-of-order entries may have
and be accepted with
and be accepted with

```
time_of_most_recent_line - (max_chunk_age/2)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab
github.com/cristalhq/hedgedhttp v0.6.1
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8
Expand Down Expand Up @@ -103,6 +104,7 @@ require (
)

require (
github.com/mattn/go-ieproxy v0.0.1
github.com/xdg-go/scram v1.0.2
gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0
)
Expand Down Expand Up @@ -211,7 +213,6 @@ require (
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.43 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA=
github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
Expand Down
3 changes: 3 additions & 0 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/aws"
"github.com/grafana/loki/pkg/storage/chunk/azure"
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/util"
)
Expand All @@ -32,6 +33,7 @@ type Storage struct {
Azure azure.BlobStorageConfig `yaml:"azure"`
Swift openstack.SwiftConfig `yaml:"swift"`
FSConfig FilesystemConfig `yaml:"filesystem"`
Hedging hedging.Config `yaml:"hedging"`
}

func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -40,6 +42,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.Azure.RegisterFlagsWithPrefix(prefix+".azure", f)
s.Swift.RegisterFlagsWithPrefix(prefix+".swift", f)
s.FSConfig.RegisterFlagsWithPrefix(prefix+".filesystem", f)
s.Hedging.RegisterFlagsWithPrefix(prefix, f)
}

type FilesystemConfig struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified
//derivations, with the derivations taking precedence.
func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) {
//Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
//change ingester ring values when applying the common config, so there's no need for the DeepEqual
//check here.
// Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
// change ingester ring values when applying the common config, so there's no need for the DeepEqual
// check here.
if mergeWithExisting {
r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
Expand Down Expand Up @@ -331,7 +331,7 @@ var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in
func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
var applyConfig func(*ConfigWrapper)

//only one config is allowed
// only one config is allowed
configsFound := 0

if !reflect.DeepEqual(cfg.Common.Storage.Azure, defaults.StorageConfig.AzureStorageConfig) {
Expand All @@ -341,6 +341,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "azure"
r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure.ToCortexAzureConfig()
r.StorageConfig.AzureStorageConfig = r.Common.Storage.Azure
r.StorageConfig.Hedging = r.Common.Storage.Hedging
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeAzure
}
}
Expand Down Expand Up @@ -368,6 +369,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig()
r.StorageConfig.GCSConfig = r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeGCS
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}

Expand All @@ -379,6 +381,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.S3 = r.Common.Storage.S3.ToCortexS3Config()
r.StorageConfig.AWSStorageConfig.S3Config = r.Common.Storage.S3
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeS3
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}

Expand All @@ -390,6 +393,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift.ToCortexSwiftConfig()
r.StorageConfig.Swift = r.Common.Storage.Swift
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeSwift
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ var Fixtures = []testutils.Fixture{
schemaCfg: schemaConfig,
metrics: newMetrics(nil),
}
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil)
mock := newMockS3()
object := objectclient.NewClient(&S3ObjectClient{S3: mock, hedgedS3: mock}, nil)
return index, object, table, schemaConfig, testutils.CloserFunc(func() error {
table.Stop()
index.Stop()
Expand Down
66 changes: 43 additions & 23 deletions pkg/storage/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)

const (
Expand Down Expand Up @@ -159,28 +160,27 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig {
}

type S3ObjectClient struct {
cfg S3Config
cfg S3Config

bucketNames []string
S3 s3iface.S3API
hedgedS3 s3iface.S3API
sseConfig *SSEParsedConfig
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
s3Config, bucketNames, err := buildS3Config(cfg)
func NewS3ObjectClient(cfg S3Config, hedgingCfg hedging.Config) (*S3ObjectClient, error) {
bucketNames, err := buckets(cfg)
if err != nil {
return nil, errors.Wrap(err, "failed to build s3 config")
return nil, err
}

sess, err := session.NewSession(s3Config)
s3Client, err := buildS3Client(cfg, hedgingCfg, false)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
return nil, errors.Wrap(err, "failed to build s3 config")
}

s3Client := s3.New(sess)

if cfg.SignatureVersion == SignatureVersionV2 {
s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg))
s3ClientHedging, err := buildS3Client(cfg, hedgingCfg, true)
if err != nil {
return nil, errors.Wrap(err, "failed to build s3 config")
}

sseCfg, err := buildSSEParsedConfig(cfg)
Expand All @@ -191,6 +191,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
client := S3ObjectClient{
cfg: cfg,
S3: s3Client,
hedgedS3: s3ClientHedging,
bucketNames: bucketNames,
sseConfig: sseCfg,
}
Expand Down Expand Up @@ -234,15 +235,15 @@ func v2SignRequestHandler(cfg S3Config) request.NamedHandler {
}
}

func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S3, error) {
var s3Config *aws.Config
var err error

// if an s3 url is passed use it to initialize the s3Config and then override with any additional params
if cfg.S3.URL != nil {
s3Config, err = awscommon.ConfigFromURL(cfg.S3.URL)
if err != nil {
return nil, nil, err
return nil, err
}
} else {
s3Config = &aws.Config{}
Expand All @@ -266,7 +267,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {

if cfg.AccessKeyID != "" && cfg.SecretAccessKey == "" ||
cfg.AccessKeyID == "" && cfg.SecretAccessKey != "" {
return nil, nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
return nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
Expand All @@ -282,7 +283,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, nil, err
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
}
Expand Down Expand Up @@ -310,11 +311,31 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
if cfg.Inject != nil {
transport = cfg.Inject(transport)
}

s3Config = s3Config.WithHTTPClient(&http.Client{
httpClient := &http.Client{
Transport: transport,
})
}

if hedging {
httpClient = hedgingCfg.Client(httpClient)
}

s3Config = s3Config.WithHTTPClient(httpClient)

sess, err := session.NewSession(s3Config)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
}

s3Client := s3.New(sess)

if cfg.SignatureVersion == SignatureVersionV2 {
s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg))
}

return s3Client, nil
}

func buckets(cfg S3Config) ([]string, error) {
// bucketnames
var bucketNames []string
if cfg.S3.URL != nil {
Expand All @@ -326,10 +347,9 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
}

if len(bucketNames) == 0 {
return nil, nil, errors.New("at least one bucket name must be specified")
return nil, errors.New("at least one bucket name must be specified")
}

return s3Config, bucketNames, nil
return bucketNames, nil
}

// Stop fulfills the chunk.ObjectClient interface
Expand Down Expand Up @@ -376,7 +396,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
Expand Down
Loading