From f04d69e546f448c95958ed496bcb9cbd51cf5cfa Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Oct 2021 13:44:55 -0400 Subject: [PATCH 1/3] feat: add `--storage-write-timeout` flag to set write request timeouts --- cmd/influxd/launcher/cmd.go | 5 +++++ storage/config.go | 5 ++++- storage/engine.go | 1 + v1/coordinator/config.go | 7 ------- v1/coordinator/points_writer.go | 3 +++ 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/cmd/influxd/launcher/cmd.go b/cmd/influxd/launcher/cmd.go index ea87816cbf9..33281d81c16 100644 --- a/cmd/influxd/launcher/cmd.go +++ b/cmd/influxd/launcher/cmd.go @@ -456,6 +456,11 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt { }, // storage configuration + { + DestP: &o.StorageConfig.WriteTimeout, + Flag: "storage-write-timeout", + Desc: "The max amount of time the engine will spend completing a write request before cancelling with a timeout.", + }, { DestP: &o.StorageConfig.Data.WALFsyncDelay, Flag: "storage-wal-fsync-delay", diff --git a/storage/config.go b/storage/config.go index d8e24db2090..fe718a8e56a 100644 --- a/storage/config.go +++ b/storage/config.go @@ -1,6 +1,8 @@ package storage import ( + "time" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/v1/services/precreator" "github.com/influxdata/influxdb/v2/v1/services/retention" @@ -8,7 +10,8 @@ import ( // Config holds the configuration for an Engine. type Config struct { - Data tsdb.Config + Data tsdb.Config + WriteTimeout time.Duration RetentionService retention.Config PrecreatorConfig precreator.Config diff --git a/storage/engine.go b/storage/engine.go index 7f75d3a8dd6..9829bbe0ccc 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -128,6 +128,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index pw := coordinator.NewPointsWriter() + pw.WriteTimeout = c.WriteTimeout pw.TSDBStore = e.tsdbStore pw.MetaClient = e.metaClient e.pointsWriter = pw diff --git a/v1/coordinator/config.go b/v1/coordinator/config.go index dcaa2f52fe6..6a83f898781 100644 --- a/v1/coordinator/config.go +++ b/v1/coordinator/config.go @@ -3,15 +3,10 @@ package coordinator import ( - "time" - "github.com/influxdata/influxdb/v2/toml" ) const ( - // DefaultWriteTimeout is the default timeout for a complete write to succeed. - DefaultWriteTimeout = 10 * time.Second - // DefaultMaxConcurrentQueries is the maximum number of running queries. // A value of zero will make the maximum query limit unlimited. DefaultMaxConcurrentQueries = 0 @@ -27,7 +22,6 @@ const ( // Config represents the configuration for the coordinator service. type Config struct { - WriteTimeout toml.Duration `toml:"write-timeout"` MaxConcurrentQueries int `toml:"max-concurrent-queries"` LogQueriesAfter toml.Duration `toml:"log-queries-after"` MaxSelectPointN int `toml:"max-select-point"` @@ -38,7 +32,6 @@ type Config struct { // NewConfig returns an instance of Config with defaults. func NewConfig() Config { return Config{ - WriteTimeout: toml.Duration(DefaultWriteTimeout), MaxConcurrentQueries: DefaultMaxConcurrentQueries, MaxSelectPointN: DefaultMaxSelectPointN, MaxSelectSeriesN: DefaultMaxSelectSeriesN, diff --git a/v1/coordinator/points_writer.go b/v1/coordinator/points_writer.go index 5e207c436e5..133d2fde0c8 100644 --- a/v1/coordinator/points_writer.go +++ b/v1/coordinator/points_writer.go @@ -29,6 +29,9 @@ const ( statSubWriteDrop = "subWriteDrop" ) +// DefaultWriteTimeout is the default timeout for a complete write to succeed. +const DefaultWriteTimeout = 10 * time.Second + var ( // ErrTimeout is returned when a write times out. ErrTimeout = errors.New("timeout") From 39ff82ee3761c48e8285aea9a3f39c4593089020 Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Oct 2021 14:04:30 -0400 Subject: [PATCH 2/3] chore: delete now-broken test --- v1/coordinator/config_test.go | 24 ------------------------ 1 file changed, 24 deletions(-) delete mode 100644 v1/coordinator/config_test.go diff --git a/v1/coordinator/config_test.go b/v1/coordinator/config_test.go deleted file mode 100644 index 61ec591336c..00000000000 --- a/v1/coordinator/config_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package coordinator_test - -import ( - "testing" - "time" - - "github.com/BurntSushi/toml" - "github.com/influxdata/influxdb/v2/v1/coordinator" -) - -func TestConfig_Parse(t *testing.T) { - // Parse configuration. - var c coordinator.Config - if _, err := toml.Decode(` -write-timeout = "20s" -`, &c); err != nil { - t.Fatal(err) - } - - // Validate configuration. - if time.Duration(c.WriteTimeout) != 20*time.Second { - t.Fatalf("unexpected write timeout s: %s", c.WriteTimeout) - } -} From 192a1d4128989bf8d52ba4d7ea4f204a492eda49 Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Oct 2021 14:32:59 -0400 Subject: [PATCH 3/3] fix: move where default is set to fix tests & CLI help text --- cmd/influxd/launcher/cmd.go | 7 ++++--- storage/config.go | 4 ++++ storage/engine.go | 3 +-- v1/coordinator/points_writer.go | 7 ++----- v1/coordinator/points_writer_test.go | 10 +++++----- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cmd/influxd/launcher/cmd.go b/cmd/influxd/launcher/cmd.go index 33281d81c16..be73b481ed6 100644 --- a/cmd/influxd/launcher/cmd.go +++ b/cmd/influxd/launcher/cmd.go @@ -457,9 +457,10 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt { // storage configuration { - DestP: &o.StorageConfig.WriteTimeout, - Flag: "storage-write-timeout", - Desc: "The max amount of time the engine will spend completing a write request before cancelling with a timeout.", + DestP: &o.StorageConfig.WriteTimeout, + Flag: "storage-write-timeout", + Default: o.StorageConfig.WriteTimeout, + Desc: "The max amount of time the engine will spend completing a write request before cancelling with a timeout.", }, { DestP: &o.StorageConfig.Data.WALFsyncDelay, diff --git a/storage/config.go b/storage/config.go index fe718a8e56a..23320e4fcd9 100644 --- a/storage/config.go +++ b/storage/config.go @@ -8,6 +8,9 @@ import ( "github.com/influxdata/influxdb/v2/v1/services/retention" ) +// DefaultWriteTimeout is the default timeout for a complete write to succeed. +const DefaultWriteTimeout = 10 * time.Second + // Config holds the configuration for an Engine. type Config struct { Data tsdb.Config @@ -21,6 +24,7 @@ type Config struct { func NewConfig() Config { return Config{ Data: tsdb.NewConfig(), + WriteTimeout: DefaultWriteTimeout, RetentionService: retention.NewConfig(), PrecreatorConfig: precreator.NewConfig(), } diff --git a/storage/engine.go b/storage/engine.go index 9829bbe0ccc..4e7e627a76d 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -127,8 +127,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index - pw := coordinator.NewPointsWriter() - pw.WriteTimeout = c.WriteTimeout + pw := coordinator.NewPointsWriter(c.WriteTimeout) pw.TSDBStore = e.tsdbStore pw.MetaClient = e.metaClient e.pointsWriter = pw diff --git a/v1/coordinator/points_writer.go b/v1/coordinator/points_writer.go index 133d2fde0c8..7a3ec777db4 100644 --- a/v1/coordinator/points_writer.go +++ b/v1/coordinator/points_writer.go @@ -29,9 +29,6 @@ const ( statSubWriteDrop = "subWriteDrop" ) -// DefaultWriteTimeout is the default timeout for a complete write to succeed. -const DefaultWriteTimeout = 10 * time.Second - var ( // ErrTimeout is returned when a write times out. ErrTimeout = errors.New("timeout") @@ -88,10 +85,10 @@ func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp } // NewPointsWriter returns a new instance of PointsWriter for a node. -func NewPointsWriter() *PointsWriter { +func NewPointsWriter(writeTimeout time.Duration) *PointsWriter { return &PointsWriter{ closing: make(chan struct{}), - WriteTimeout: DefaultWriteTimeout, + WriteTimeout: writeTimeout, Logger: zap.NewNop(), stats: &WriteStatistics{}, } diff --git a/v1/coordinator/points_writer_test.go b/v1/coordinator/points_writer_test.go index e26b672a267..5129deca5f5 100644 --- a/v1/coordinator/points_writer_test.go +++ b/v1/coordinator/points_writer_test.go @@ -83,7 +83,7 @@ func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) { return &sg, nil } - c := coordinator.NewPointsWriter() + c := coordinator.NewPointsWriter(time.Second) c.MetaClient = ms pr := &coordinator.WritePointsRequest{ @@ -161,7 +161,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) { panic("should not get here") } - c := coordinator.NewPointsWriter() + c := coordinator.NewPointsWriter(time.Second) c.MetaClient = ms defer c.Close() pr := &coordinator.WritePointsRequest{ @@ -217,7 +217,7 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) { return &rp.ShardGroups[0], nil } - c := coordinator.NewPointsWriter() + c := coordinator.NewPointsWriter(time.Second) c.MetaClient = ms defer c.Close() pr := &coordinator.WritePointsRequest{ @@ -338,7 +338,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { return subPoints } - c := coordinator.NewPointsWriter() + c := coordinator.NewPointsWriter(time.Second) c.MetaClient = ms c.TSDBStore = store c.AddWriteSubscriber(sub.Points()) @@ -414,7 +414,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) { return subPoints } - c := coordinator.NewPointsWriter() + c := coordinator.NewPointsWriter(time.Second) c.MetaClient = ms c.TSDBStore = store c.AddWriteSubscriber(sub.Points())