Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add --storage-write-timeout flag to set write request timeouts #22617

Merged
merged 3 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
},

// storage configuration
{
DestP: &o.StorageConfig.WriteTimeout,
Flag: "storage-write-timeout",
Default: o.StorageConfig.WriteTimeout,
Desc: "The max amount of time the engine will spend completing a write request before cancelling with a timeout.",
},
{
DestP: &o.StorageConfig.Data.WALFsyncDelay,
Flag: "storage-wal-fsync-delay",
Expand Down
9 changes: 8 additions & 1 deletion storage/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package storage

import (
"time"

"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/v1/services/precreator"
"github.com/influxdata/influxdb/v2/v1/services/retention"
)

// DefaultWriteTimeout is the default timeout for a complete write to succeed.
const DefaultWriteTimeout = 10 * time.Second

// Config holds the configuration for an Engine.
type Config struct {
Data tsdb.Config
Data tsdb.Config
WriteTimeout time.Duration

RetentionService retention.Config
PrecreatorConfig precreator.Config
Expand All @@ -18,6 +24,7 @@ type Config struct {
func NewConfig() Config {
return Config{
Data: tsdb.NewConfig(),
WriteTimeout: DefaultWriteTimeout,
RetentionService: retention.NewConfig(),
PrecreatorConfig: precreator.NewConfig(),
}
Expand Down
2 changes: 1 addition & 1 deletion storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine
e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index

pw := coordinator.NewPointsWriter()
pw := coordinator.NewPointsWriter(c.WriteTimeout)
pw.TSDBStore = e.tsdbStore
pw.MetaClient = e.metaClient
e.pointsWriter = pw
Expand Down
7 changes: 0 additions & 7 deletions v1/coordinator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@
package coordinator

import (
"time"

"github.com/influxdata/influxdb/v2/toml"
)

const (
// DefaultWriteTimeout is the default timeout for a complete write to succeed.
DefaultWriteTimeout = 10 * time.Second

// DefaultMaxConcurrentQueries is the maximum number of running queries.
// A value of zero will make the maximum query limit unlimited.
DefaultMaxConcurrentQueries = 0
Expand All @@ -27,7 +22,6 @@ const (

// Config represents the configuration for the coordinator service.
type Config struct {
WriteTimeout toml.Duration `toml:"write-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
LogQueriesAfter toml.Duration `toml:"log-queries-after"`
MaxSelectPointN int `toml:"max-select-point"`
Expand All @@ -38,7 +32,6 @@ type Config struct {
// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
WriteTimeout: toml.Duration(DefaultWriteTimeout),
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
MaxSelectPointN: DefaultMaxSelectPointN,
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
Expand Down
24 changes: 0 additions & 24 deletions v1/coordinator/config_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions v1/coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp
}

// NewPointsWriter returns a new instance of PointsWriter for a node.
func NewPointsWriter() *PointsWriter {
func NewPointsWriter(writeTimeout time.Duration) *PointsWriter {
return &PointsWriter{
closing: make(chan struct{}),
WriteTimeout: DefaultWriteTimeout,
WriteTimeout: writeTimeout,
Logger: zap.NewNop(),
stats: &WriteStatistics{},
}
Expand Down
10 changes: 5 additions & 5 deletions v1/coordinator/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
return &sg, nil
}

c := coordinator.NewPointsWriter()
c := coordinator.NewPointsWriter(time.Second)
c.MetaClient = ms

pr := &coordinator.WritePointsRequest{
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
panic("should not get here")
}

c := coordinator.NewPointsWriter()
c := coordinator.NewPointsWriter(time.Second)
c.MetaClient = ms
defer c.Close()
pr := &coordinator.WritePointsRequest{
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) {
return &rp.ShardGroups[0], nil
}

c := coordinator.NewPointsWriter()
c := coordinator.NewPointsWriter(time.Second)
c.MetaClient = ms
defer c.Close()
pr := &coordinator.WritePointsRequest{
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
return subPoints
}

c := coordinator.NewPointsWriter()
c := coordinator.NewPointsWriter(time.Second)
c.MetaClient = ms
c.TSDBStore = store
c.AddWriteSubscriber(sub.Points())
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
return subPoints
}

c := coordinator.NewPointsWriter()
c := coordinator.NewPointsWriter(time.Second)
c.MetaClient = ms
c.TSDBStore = store
c.AddWriteSubscriber(sub.Points())
Expand Down