Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the ability to hedge storage requests. #4826

Merged
merged 14 commits into from
Nov 30, 2021
62 changes: 53 additions & 9 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ The `query_scheduler` block configures the Loki query scheduler.
# This configures the gRPC client used to report errors back to the
# query-frontend.
[grpc_client_config: <grpc_client_config>]

# Set to true to have the query schedulers create and place themselves in a ring.
# If no frontend_address or scheduler_address are present
# If no frontend_address or scheduler_address are present
# anywhere else in the configuration, Loki will toggle this value to true.
[use_scheduler_ring: <boolean> | default = false]

Expand Down Expand Up @@ -453,7 +453,7 @@ storage:
# Method to use for backend rule storage (azure, gcs, s3, swift, local).
# CLI flag: -ruler.storage.type
[type: <string> ]

# Configures backend rule storage for Azure.
[azure: <azure_storage_config>]

Expand Down Expand Up @@ -509,8 +509,8 @@ remote_write:
# List of remote write relabel configurations.
write_relabel_configs:
[- <relabel_config> ...]
# Name of the remote write config, which if specified must be unique among remote

# Name of the remote write config, which if specified must be unique among remote
# write configs.
# The name will be used in metrics and logging in place of a generated value
# to help users distinguish between remote write configs.
Expand Down Expand Up @@ -703,6 +703,17 @@ The `azure_storage_config` configures Azure as a general storage for different d
# Maximum time to wait before retrying a request.
# CLI flag: -<prefix>.azure.max-retry-delay
[max_retry_delay: <duration> | default = 500ms]

hedging:
# Optional. Default is 0 (disabled)
# Example: "at: 500ms"
# If set to a non-zero value another request will be issued at the provided duration. Recommended to
# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
[up_to: <int> | default = 2]
```

## gcs_storage_config
Expand All @@ -722,6 +733,17 @@ The `gcs_storage_config` configures GCS as a general storage for different data
# The duration after which the requests to GCS should be timed out.
# CLI flag: -<prefix>.gcs.request-timeout
[request_timeout: <duration> | default = 0s]

hedging:
# Optional. Default is 0 (disabled)
# Example: "at: 500ms"
# If set to a non-zero value another request will be issued at the provided duration. Recommended to
# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
[up_to: <int> | default = 2]
```

## s3_storage_config
Expand Down Expand Up @@ -786,6 +808,17 @@ http_config:
# endpoint.
# CLI flag: -<prefix>.s3.http.ca-file
[ca_file: <string> | default = ""]

hedging:
# Optional. Default is 0 (disabled)
# Example: "at: 500ms"
# If set to a non-zero value another request will be issued at the provided duration. Recommended to
# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
[up_to: <int> | default = 2]
```

## swift_storage_config
Expand Down Expand Up @@ -851,6 +884,17 @@ The `swift_storage_config` configures Swift as a general storage for different d
# Name of the Swift container to put chunks in.
# CLI flag: -<prefix>.swift.container-name
[container_name: <string> | default = "cortex"]

hedging:
# Optional. Default is 0 (disabled)
# Example: "at: 500ms"
# If set to a non-zero value another request will be issued at the provided duration. Recommended to
# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
[up_to: <int> | default = 2]
```

## local_storage_config
Expand Down Expand Up @@ -1323,11 +1367,11 @@ aws:
# Minimum duration to back off.
# CLI flag: -s3.backoff-min-period
[min_period: <duration> | default = 100ms]

# The duration to back off.
# CLI flag: -s3.backoff-max-period
[max_period: <duration> | default = 3s]

# Number of times to back off and retry before failing.
# CLI flag: -s3.backoff-retries
[max_retries: <int> | default = 5]
Expand Down Expand Up @@ -2081,7 +2125,7 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# up until lookback duration ago.
# This limit is enforced in the query frontend, the querier and the ruler.
# If the requested time range is outside the allowed range, the request will not fail,
# but will be modified to only query data within the allowed time range.
# but will be modified to only query data within the allowed time range.
# The default value of 0 does not set a limit.
# CLI flag: -querier.max-query-lookback
[max_query_lookback: <duration> | default = 0]
Expand Down Expand Up @@ -2512,7 +2556,7 @@ How far into the past accepted out-of-order log entries may be
is configurable with `max_chunk_age`.
`max_chunk_age` defaults to 1 hour.
Loki calculates the earliest time that out-of-order entries may have
and be accepted with
and be accepted with

```
time_of_most_recent_line - (max_chunk_age/2)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211104100946-3f329a21cad4
github.com/davecgh/go-spew v1.1.1
github.com/cristalhq/hedgedhttp v0.6.1
github.com/docker/docker v20.10.8+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8
github.com/drone/envsubst v1.0.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA=
github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ var Fixtures = []testutils.Fixture{
schemaCfg: schemaConfig,
metrics: newMetrics(nil),
}
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil)
mock := newMockS3()
object := objectclient.NewClient(&S3ObjectClient{S3: mock, hedgedS3: mock}, nil)
return index, object, table, schemaConfig, testutils.CloserFunc(func() error {
table.Stop()
index.Stop()
Expand Down
63 changes: 42 additions & 21 deletions pkg/storage/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)

const (
Expand Down Expand Up @@ -78,6 +79,7 @@ type S3Config struct {
SignatureVersion string `yaml:"signature_version"`
SSEConfig cortex_s3.SSEConfig `yaml:"sse"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
Hedging hedging.Config `yaml:"hedging"`

Inject InjectRequestMiddleware `yaml:"-"`
}
Expand Down Expand Up @@ -122,6 +124,7 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.BackoffConfig.MinBackoff, prefix+"s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object")
f.DurationVar(&cfg.BackoffConfig.MaxBackoff, prefix+"s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object")
f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+"s3.max-retries", 5, "Maximum number of times to retry when s3 get Object")
cfg.Hedging.RegisterFlagsWithPrefix(prefix+"s3.", f)
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -162,25 +165,23 @@ type S3ObjectClient struct {
cfg S3Config
bucketNames []string
S3 s3iface.S3API
hedgedS3 s3iface.S3API
sseConfig *SSEParsedConfig
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
s3Config, bucketNames, err := buildS3Config(cfg)
bucketNames, err := buckets(cfg)
if err != nil {
return nil, errors.Wrap(err, "failed to build s3 config")
return nil, err
}

sess, err := session.NewSession(s3Config)
s3Client, err := buildS3Client(cfg, false)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
return nil, errors.Wrap(err, "failed to build s3 config")
}

s3Client := s3.New(sess)

if cfg.SignatureVersion == SignatureVersionV2 {
s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg))
s3ClientHedging, err := buildS3Client(cfg, true)
if err != nil {
return nil, errors.Wrap(err, "failed to build s3 config")
}

sseCfg, err := buildSSEParsedConfig(cfg)
Expand All @@ -191,6 +192,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
client := S3ObjectClient{
cfg: cfg,
S3: s3Client,
hedgedS3: s3ClientHedging,
bucketNames: bucketNames,
sseConfig: sseCfg,
}
Expand Down Expand Up @@ -234,15 +236,15 @@ func v2SignRequestHandler(cfg S3Config) request.NamedHandler {
}
}

func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
func buildS3Client(cfg S3Config, hedging bool) (*s3.S3, error) {
var s3Config *aws.Config
var err error

// if an s3 url is passed use it to initialize the s3Config and then override with any additional params
if cfg.S3.URL != nil {
s3Config, err = awscommon.ConfigFromURL(cfg.S3.URL)
if err != nil {
return nil, nil, err
return nil, err
}
} else {
s3Config = &aws.Config{}
Expand All @@ -266,7 +268,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {

if cfg.AccessKeyID != "" && cfg.SecretAccessKey == "" ||
cfg.AccessKeyID == "" && cfg.SecretAccessKey != "" {
return nil, nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
return nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
Expand All @@ -282,7 +284,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, nil, err
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
}
Expand Down Expand Up @@ -310,11 +312,31 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
if cfg.Inject != nil {
transport = cfg.Inject(transport)
}

s3Config = s3Config.WithHTTPClient(&http.Client{
httpClient := &http.Client{
Transport: transport,
})
}

if hedging {
httpClient = cfg.Hedging.Client(httpClient)
}

s3Config = s3Config.WithHTTPClient(httpClient)

sess, err := session.NewSession(s3Config)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
}

s3Client := s3.New(sess)

if cfg.SignatureVersion == SignatureVersionV2 {
s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg))
}

return s3Client, nil
}

func buckets(cfg S3Config) ([]string, error) {
// bucketnames
var bucketNames []string
if cfg.S3.URL != nil {
Expand All @@ -326,10 +348,9 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
}

if len(bucketNames) == 0 {
return nil, nil, errors.New("at least one bucket name must be specified")
return nil, errors.New("at least one bucket name must be specified")
}

return s3Config, bucketNames, nil
return bucketNames, nil
}

// Stop fulfills the chunk.ObjectClient interface
Expand Down Expand Up @@ -376,7 +397,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
Expand Down
Loading