Skip to content

Commit

Permalink
Allow setting ring heartbeat timeout to zero to disable timeout check.
Browse files Browse the repository at this point in the history
This change allows the various ring heartbeat timeouts to be configured
with zero, as a means of disabling the timeout. This is expected to be
used with a separate enhancement to allow disabling heartbeats. When the
heartbeat timeout is disabled, instances will always appear as healthy
in the ring.

Signed-off-by: Steve Simpson <[email protected]>
  • Loading branch information
stevesg committed Jul 6, 2021
1 parent 95fedaa commit e0bd75e
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 19 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
* `-distributor.ring.heartbeat-timeout`
* `-ring.heartbeat-timeout`
* `-ruler.ring.heartbeat-timeout`
* `-alertmanager.sharding-ring.heartbeat-timeout`
* `-compactor.ring.heartbeat-timeout`
* `-store-gateway.sharding-ring.heartbeat-timeout`
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28
Expand Down
2 changes: 1 addition & 1 deletion docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ compactor:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which compactors are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
4 changes: 2 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ store_gateway:
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. This option needs be set both on the store-gateway and
# querier when running in microservices mode.
# within the ring. 0 = never (timeout disabled). This option needs be set
# both on the store-gateway and querier when running in microservices mode.
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down
13 changes: 7 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ ring:
[heartbeat_period: <duration> | default = 5s]
# The heartbeat timeout after which distributors are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -662,6 +662,7 @@ lifecycler:
[mirror_timeout: <duration> | default = 2s]
# The heartbeat timeout after which ingesters are skipped for reads/writes.
# 0 = never (timeout disabled).
# CLI flag: -ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -1585,7 +1586,7 @@ ring:
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which rulers are considered unhealthy within the
# ring.
# ring. 0 = never (timeout disabled).
# CLI flag: -ruler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

Expand Down Expand Up @@ -1906,7 +1907,7 @@ sharding_ring:
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which alertmanagers are considered unhealthy
# within the ring.
# within the ring. 0 = never (timeout disabled).
# CLI flag: -alertmanager.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -5179,7 +5180,7 @@ sharding_ring:
[heartbeat_period: <duration> | default = 5s]
# The heartbeat timeout after which compactors are considered unhealthy within
# the ring.
# the ring. 0 = never (timeout disabled).
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down Expand Up @@ -5257,8 +5258,8 @@ sharding_ring:
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. This option needs be set both on the store-gateway and
# querier when running in microservices mode.
# within the ring. 0 = never (timeout disabled). This option needs be set both
# on the store-gateway and querier when running in microservices mode.
# CLI flag: -store-gateway.sharding-ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(rfprefix, "alertmanagers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, rfprefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, rfprefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which alertmanagers are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, rfprefix+"replication-factor", 3, "The replication factor to use when sharding the alertmanager.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ring.", "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "distributor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "distributor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which distributors are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (d *Desc) FindIngestersByState(state InstanceState) []InstanceDesc {
func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
numTokens := 0
for id, ingester := range d.Ingesters {
if now.Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout {
if !ingester.IsHeartbeatHealthy(heartbeatTimeout, now) {
return fmt.Errorf("instance %s past heartbeat timeout", id)
} else if ingester.State != ACTIVE {
return fmt.Errorf("instance %s in state %v", id, ingester.State)
Expand Down Expand Up @@ -136,7 +136,16 @@ func (i *InstanceDesc) GetRegisteredAt() time.Time {
func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := op.IsInstanceInStateHealthy(i.State)

return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
return healthy && i.IsHeartbeatHealthy(heartbeatTimeout, now)
}

// IsHeartbeatHealthy returns whether the heartbeat timestamp for the ingester is within the
// specified timeout period. A timeout of zero disables the timeout; the heartbeat is ignored.
func (i *InstanceDesc) IsHeartbeatHealthy(heartbeatTimeout time.Duration, now time.Time) bool {
if heartbeatTimeout == 0 {
return true
}
return now.Sub(time.Unix(i.Timestamp, 0)) <= heartbeatTimeout
}

// Merge merges other ring into this one. Returns sub-ring that represents the change,
Expand Down
8 changes: 8 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,18 @@ func TestDesc_Ready(t *testing.T) {
t.Fatal("expected ready, got", err)
}

if err := r.Ready(now, 0); err != nil {
t.Fatal("expected ready, got", err)
}

if err := r.Ready(now.Add(5*time.Minute), 10*time.Second); err == nil {
t.Fatal("expected !ready (no heartbeat from active ingester), but got no error")
}

if err := r.Ready(now.Add(5*time.Minute), 0); err != nil {
t.Fatal("expected ready (no heartbeat but timeout disabled), got", err)
}

r = &Desc{
Ingesters: map[string]InstanceDesc{
"ing1": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f)

f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).")
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ func TestRing_GetAllHealthy(t *testing.T) {
}

func TestRing_GetReplicationSetForOperation(t *testing.T) {
const heartbeatTimeout = time.Minute
now := time.Now()

tests := map[string]struct {
ringInstances map[string]InstanceDesc
ringHeartbeatTimeout time.Duration
ringReplicationFactor int
expectedErrForRead error
expectedSetForRead []string
Expand All @@ -405,6 +405,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
}{
"should return error on empty ring": {
ringInstances: nil,
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedErrForRead: ErrEmptyRing,
expectedErrForWrite: ErrEmptyRing,
Expand All @@ -418,11 +419,41 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-40 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},
"should succeed on instances with old timestamps but heartbeat timeout disabled": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: 0,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},
"should succeed on instances with zero timestamp but heartbeat timeout disabled": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: 0, Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: 0,
ringReplicationFactor: 1,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
expectedSetForReporting: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"},
},

"should fail on 1 unhealthy instance and RF=1": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: now.Unix(), Tokens: GenerateTokens(128, nil)},
Expand All @@ -431,6 +462,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 1,
expectedErrForRead: ErrTooManyUnhealthyInstances,
expectedErrForWrite: ErrTooManyUnhealthyInstances,
Expand All @@ -444,6 +476,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 3,
expectedSetForRead: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"},
expectedSetForWrite: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"},
Expand All @@ -457,6 +490,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
"instance-4": {Addr: "127.0.0.4", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: GenerateTokens(128, nil)},
},
ringHeartbeatTimeout: time.Minute,
ringReplicationFactor: 3,
expectedErrForRead: ErrTooManyUnhealthyInstances,
expectedErrForWrite: ErrTooManyUnhealthyInstances,
Expand All @@ -474,7 +508,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {

ring := Ring{
cfg: Config{
HeartbeatTimeout: heartbeatTimeout,
HeartbeatTimeout: testData.ringHeartbeatTimeout,
ReplicationFactor: testData.ringReplicationFactor,
},
ringDesc: ringDesc,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("ruler.ring.", "rulers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(ringFlagsPrefix, "collectors/", f)
f.DurationVar(&cfg.HeartbeatPeriod, ringFlagsPrefix+"heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring."+sharedOptionWithQuerier)
f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring. 0 = never (timeout disabled)."+sharedOptionWithQuerier)
f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithQuerier)
f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, ringFlagsPrefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.")
Expand Down

0 comments on commit e0bd75e

Please sign in to comment.