Skip to content

Commit

Permalink
ring: include unhealthy instances in error instead of logging
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Oct 25, 2021
1 parent fbd75ed commit 947766b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 60 deletions.
50 changes: 22 additions & 28 deletions ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package ring

import (
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
)

type ReplicationStrategy interface {
Expand All @@ -16,12 +13,10 @@ type ReplicationStrategy interface {
Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error)
}

type defaultReplicationStrategy struct {
logger log.Logger
}
type defaultReplicationStrategy struct{}

func NewDefaultReplicationStrategy(logger log.Logger) ReplicationStrategy {
return &defaultReplicationStrategy{logger: logger}
func NewDefaultReplicationStrategy() ReplicationStrategy {
return &defaultReplicationStrategy{}
}

// Filter decides, given the set of instances eligible for a key,
Expand All @@ -44,12 +39,12 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed instances
// will cause the whole write to fail.
var skipped []string
var unhealthy []string
for i := 0; i < len(instances); {
if instances[i].IsHealthy(op, heartbeatTimeout, now) {
i++
} else {
skipped = append(skipped, instances[i].Addr)
unhealthy = append(unhealthy, instances[i].Addr)
instances = append(instances[:i], instances[i+1:]...)
}
}
Expand All @@ -58,17 +53,15 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
// after filtering out dead ones, don't even bother trying.
if len(instances) < minSuccess {
var err error
var unhealthyStr string
if len(unhealthy) > 0 {
unhealthyStr = fmt.Sprintf(" - unhealthy instances: %s", strings.Join(unhealthy, ","))
}

if zoneAwarenessEnabled {
level.Error(s.logger).Log("msg",
fmt.Sprintf("at least %d live replicas required across different availability zones, could only find %d",
minSuccess, len(instances)), "unhealthy", skipped)
err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(instances))
err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d%s", minSuccess, len(instances), unhealthyStr)
} else {
level.Error(s.logger).Log("msg",
fmt.Sprintf("at least %d live replicas required, could only find %d",
minSuccess, len(instances)), "unhealthy", skipped)
err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(instances))
err = fmt.Errorf("at least %d live replicas required, could only find %d%s", minSuccess, len(instances), unhealthyStr)
}

return nil, 0, err
Expand All @@ -77,31 +70,32 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
return instances, len(instances) - minSuccess, nil
}

type ignoreUnhealthyInstancesReplicationStrategy struct {
logger log.Logger
}
type ignoreUnhealthyInstancesReplicationStrategy struct{}

func NewIgnoreUnhealthyInstancesReplicationStrategy(logger log.Logger) ReplicationStrategy {
return &ignoreUnhealthyInstancesReplicationStrategy{logger: logger}
func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy {
return &ignoreUnhealthyInstancesReplicationStrategy{}
}

func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []InstanceDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []InstanceDesc, maxFailures int, err error) {
now := time.Now()
// Filter out unhealthy instances.
var skipped []string
var unhealthy []string
for i := 0; i < len(instances); {
if instances[i].IsHealthy(op, heartbeatTimeout, now) {
i++
} else {
skipped = append(skipped, instances[i].Addr)
unhealthy = append(unhealthy, instances[i].Addr)
instances = append(instances[:i], instances[i+1:]...)
}
}

// We need at least 1 healthy instance no matter what is the replication factor set to.
if len(instances) == 0 {
level.Error(r.logger).Log("msg", "failed to find any healthy ring replicas", "unhealthy", skipped)
return nil, 0, errors.New("at least 1 healthy replica required, could only find 0")
var unhealthyStr string
if len(unhealthy) > 0 {
unhealthyStr = fmt.Sprintf(" - unhealthy instances: %s", strings.Join(unhealthy, ","))
}
return nil, 0, fmt.Errorf("at least 1 healthy replica required, could only find 0%s", unhealthyStr)
}

return instances, len(instances) - 1, nil
Expand Down
17 changes: 8 additions & 9 deletions ring/replication_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,7 +24,7 @@ func TestRingReplicationStrategy(t *testing.T) {
{
replicationFactor: 1,
deadIngesters: 1,
expectedError: "at least 1 live replicas required, could only find 0",
expectedError: "at least 1 live replicas required, could only find 0 - unhealthy instances: dead1",
},

// Ensure it works for RF=3 and 2 ingesters.
Expand Down Expand Up @@ -53,7 +52,7 @@ func TestRingReplicationStrategy(t *testing.T) {
replicationFactor: 3,
liveIngesters: 1,
deadIngesters: 2,
expectedError: "at least 2 live replicas required, could only find 1",
expectedError: "at least 2 live replicas required, could only find 1 - unhealthy instances: dead1,dead2",
},

// Ensure it works when adding / removing nodes.
Expand All @@ -76,7 +75,7 @@ func TestRingReplicationStrategy(t *testing.T) {
replicationFactor: 3,
liveIngesters: 2,
deadIngesters: 2,
expectedError: "at least 3 live replicas required, could only find 2",
expectedError: "at least 3 live replicas required, could only find 2 - unhealthy instances: dead1,dead2",
},
} {
ingesters := []InstanceDesc{}
Expand All @@ -86,11 +85,11 @@ func TestRingReplicationStrategy(t *testing.T) {
})
}
for i := 0; i < tc.deadIngesters; i++ {
ingesters = append(ingesters, InstanceDesc{})
ingesters = append(ingesters, InstanceDesc{Addr: fmt.Sprintf("dead%d", i+1)})
}

t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
strategy := NewDefaultReplicationStrategy(log.NewNopLogger())
strategy := NewDefaultReplicationStrategy()
liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.replicationFactor, 100*time.Second, false)
if tc.expectedError == "" {
assert.NoError(t, err)
Expand Down Expand Up @@ -138,7 +137,7 @@ func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) {
liveIngesters: 0,
deadIngesters: 3,
expectedMaxFailure: 0,
expectedError: "at least 1 healthy replica required, could only find 0",
expectedError: "at least 1 healthy replica required, could only find 0 - unhealthy instances: dead1,dead2,dead3",
},
} {
ingesters := []InstanceDesc{}
Expand All @@ -148,11 +147,11 @@ func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) {
})
}
for i := 0; i < tc.deadIngesters; i++ {
ingesters = append(ingesters, InstanceDesc{})
ingesters = append(ingesters, InstanceDesc{Addr: fmt.Sprintf("dead%d", i+1)})
}

t.Run(tc.name, func(t *testing.T) {
strategy := NewIgnoreUnhealthyInstancesReplicationStrategy(log.NewNopLogger())
strategy := NewIgnoreUnhealthyInstancesReplicationStrategy()
liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, 3, 100*time.Second, false)
if tc.expectedError == "" {
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(logger), reg, logger)
return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger)
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) {
Expand Down
36 changes: 18 additions & 18 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func benchmarkBatch(b *testing.B, numInstances, numKeys int) {
r := Ring{
cfg: cfg,
ringDesc: desc,
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

ctx := context.Background()
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestDoBatchZeroInstances(t *testing.T) {
r := Ring{
cfg: Config{},
ringDesc: desc,
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}
require.Error(t, DoBatch(ctx, Write, &r, keys, callback, cleanup))
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) {
ringTokensByZone: r.getTokensByZone(),
ringInstanceByToken: r.getTokensInfo(),
ringZones: getZones(r.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

_, bufHosts, bufZones := MakeBuffersForGet()
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) {
ringTokensByZone: r.getTokensByZone(),
ringInstanceByToken: r.getTokensInfo(),
ringZones: getZones(r.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

instances := make([]InstanceDesc, 0, len(r.GetIngesters()))
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestRing_GetAllHealthy(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

set, err := ring.GetAllHealthy(Read)
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

set, err := ring.GetReplicationSetForOperation(Read)
Expand Down Expand Up @@ -821,7 +821,7 @@ func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing.
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Check the replication set has the correct settings
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestRing_ShuffleShard(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

shardRing := ring.ShuffleShard("tenant-id", testData.shardSize)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

for i := 1; i <= numTenants; i++ {
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Compute the shard for each tenant.
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Compute the initial shard for each tenant.
Expand Down Expand Up @@ -1240,7 +1240,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Get the replication set with shard size = 3.
Expand Down Expand Up @@ -1317,7 +1317,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Get the replication set with shard size = 2.
Expand Down Expand Up @@ -1576,7 +1576,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Replay the events on the timeline.
Expand Down Expand Up @@ -1641,7 +1641,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// The simulation starts with the minimum shard size. Random events can later increase it.
Expand Down Expand Up @@ -1794,7 +1794,7 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
}

Expand Down Expand Up @@ -1822,7 +1822,7 @@ func BenchmarkRing_Get(b *testing.B) {
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
}

Expand Down Expand Up @@ -1850,7 +1850,7 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) {
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
}

Expand Down
7 changes: 3 additions & 4 deletions ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -121,7 +120,7 @@ func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

startTime := time.Now()
Expand Down Expand Up @@ -156,7 +155,7 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing.
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Add 1 new instance after some time.
Expand Down Expand Up @@ -207,7 +206,7 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(log.NewNopLogger()),
strategy: NewDefaultReplicationStrategy(),
}

// Keep changing the ring.
Expand Down

0 comments on commit 947766b

Please sign in to comment.