From 0d82b0256f3c8f73042a262640205d761aad4cd6 Mon Sep 17 00:00:00 2001 From: morimoto-cybozu Date: Thu, 20 Jun 2024 13:10:26 +0000 Subject: [PATCH] Delay repair of an out-of-cluster unreachable machine Signed-off-by: morimoto-cybozu --- CHANGELOG.md | 2 + docs/sabakan-triggered-repair.md | 6 ++- sabakan/integrate.go | 2 +- sabakan/repairer.go | 40 ++++++++++++-------- sabakan/repairer_test.go | 64 +++++++++++++++++++++++++------- 5 files changed, 84 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3291ee0..b7523226 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ This project employs a versioning scheme described in [RELEASE.md](RELEASE.md#ve ## [Unreleased] +- Delay repair of an out-of-cluster unreachable machine in [#744](https://github.com/cybozu-go/cke/pull/744) + ## [1.28.4] ### Changed diff --git a/docs/sabakan-triggered-repair.md b/docs/sabakan-triggered-repair.md index 8f2bce6b..6816069e 100644 --- a/docs/sabakan-triggered-repair.md +++ b/docs/sabakan-triggered-repair.md @@ -86,7 +86,11 @@ As stated above, CKE considers all persisting queue entries as "recent" for simp A machine may become "UNREACHABLE" very quickly even if it is [being rebooted in a planned manner](reboot.md). CKE should wait for a while before starting repair operations for a rebooting machine. -A user can [configure the wait time](ckecli.md#ckecli-constraints-set-name-value) as a [constraint `wait-seconds-to-repair-rebooting`](constraints.md) +A user can [configure the wait time](ckecli.md#ckecli-constraints-set-name-value) as a [constraint `wait-seconds-to-repair-rebooting`](constraints.md). + +CKE does not manage the reboot operations of out-of-cluster machines. +It cannot distinguish between the reboot and the crash of an out-of-cluster machine. +To avoid filling the repair queue with unnecessary entries, CKE waits for a while also before repairing an out-of-cluster machine. [sabakan]: https://github.com/cybozu-go/sabakan [schema]: https://github.com/cybozu-go/sabakan/blob/main/gql/graph/schema.graphqls diff --git a/sabakan/integrate.go b/sabakan/integrate.go index 9ff7cf7a..b8586af7 100644 --- a/sabakan/integrate.go +++ b/sabakan/integrate.go @@ -185,7 +185,7 @@ func (ig integrator) runRepairer(ctx context.Context, clusterStatus *cke.Cluster return err } - entries := Repairer(machines, clusterStatus.RepairQueue.Entries, rebootEntries, constraints, time.Now().UTC()) + entries := Repairer(machines, clusterStatus.RepairQueue.Entries, rebootEntries, clusterStatus.NodeStatuses, constraints) for _, entry := range entries { err := st.RegisterRepairsEntry(ctx, entry) diff --git a/sabakan/repairer.go b/sabakan/repairer.go index 687df772..247132dc 100644 --- a/sabakan/repairer.go +++ b/sabakan/repairer.go @@ -2,24 +2,22 @@ package sabakan import ( "strings" - "time" "github.com/cybozu-go/cke" "github.com/cybozu-go/log" ) -func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootEntries []*cke.RebootQueueEntry, constraints *cke.Constraints, now time.Time) []*cke.RepairQueueEntry { +func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootEntries []*cke.RebootQueueEntry, nodeStatuses map[string]*cke.NodeStatus, constraints *cke.Constraints) []*cke.RepairQueueEntry { recent := make(map[string]bool) for _, entry := range repairEntries { // entry.Operation is ignored when checking duplication recent[entry.Address] = true } - rebootLimit := now.Add(time.Duration(-constraints.RepairRebootingSeconds) * time.Second) - rebootingSince := make(map[string]time.Time) + rebooting := make(map[string]bool) for _, entry := range rebootEntries { if entry.Status == cke.RebootStatusRebooting { - rebootingSince[entry.Node] = entry.LastTransitionTime // entry.Node denotes IP address + rebooting[entry.Node] = true // entry.Node denotes IP address } } @@ -32,21 +30,33 @@ func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootE continue } - if recent[machine.Spec.IPv4[0]] { + serial := machine.Spec.Serial + address := machine.Spec.IPv4[0] + + if recent[address] { log.Warn("ignore recently-repaired non-healthy machine", map[string]interface{}{ - "serial": machine.Spec.Serial, - "address": machine.Spec.IPv4[0], + "serial": serial, + "address": address, }) continue } - since, ok := rebootingSince[machine.Spec.IPv4[0]] - if ok && since.After(rebootLimit) && machine.Status.State == StateUnreachable { - log.Info("ignore rebooting unreachable machine", map[string]interface{}{ - "serial": machine.Spec.Serial, - "address": machine.Spec.IPv4[0], - }) - continue + if machine.Status.State == StateUnreachable && machine.Status.Duration < float64(constraints.RepairRebootingSeconds) { + if rebooting[address] { + log.Info("ignore rebooting unreachable machine", map[string]interface{}{ + "serial": serial, + "address": address, + }) + continue + } + + if _, ok := nodeStatuses[address]; !ok { + log.Info("ignore out-of-cluster unreachable machine", map[string]interface{}{ + "serial": serial, + "address": address, + }) + continue + } } newMachines = append(newMachines, machine) diff --git a/sabakan/repairer_test.go b/sabakan/repairer_test.go index a52a4b10..ec4f5280 100644 --- a/sabakan/repairer_test.go +++ b/sabakan/repairer_test.go @@ -2,7 +2,6 @@ package sabakan import ( "testing" - "time" "github.com/cybozu-go/cke" "github.com/google/go-cmp/cmp" @@ -17,10 +16,10 @@ func TestRepairer(t *testing.T) { machines := []Machine{ {Spec: MachineSpec{Serial: "0000"}, Status: MachineStatus{State: StateUnhealthy}}, - {Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnreachable}}, - {Spec: MachineSpec{Serial: "2222", IPv4: []string{"2.2.2.2"}, BMC: BMC{Type: "type2"}}, Status: MachineStatus{State: StateUnhealthy}}, - {Spec: MachineSpec{Serial: "3333", IPv4: []string{"3.3.3.3"}, BMC: BMC{Type: "type3"}}, Status: MachineStatus{State: StateUnreachable}}, - {Spec: MachineSpec{Serial: "4444", IPv4: []string{"4.4.4.4"}, BMC: BMC{Type: "type4"}}, Status: MachineStatus{State: StateUnreachable}}, + {Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnreachable, Duration: 30}}, + {Spec: MachineSpec{Serial: "2222", IPv4: []string{"2.2.2.2"}, BMC: BMC{Type: "type2"}}, Status: MachineStatus{State: StateUnhealthy, Duration: 30}}, + {Spec: MachineSpec{Serial: "3333", IPv4: []string{"3.3.3.3"}, BMC: BMC{Type: "type3"}}, Status: MachineStatus{State: StateUnreachable, Duration: 3000}}, + {Spec: MachineSpec{Serial: "4444", IPv4: []string{"4.4.4.4"}, BMC: BMC{Type: "type4"}}, Status: MachineStatus{State: StateUnreachable, Duration: 30}}, } entries := []*cke.RepairQueueEntry{ @@ -31,15 +30,19 @@ func TestRepairer(t *testing.T) { cke.NewRepairQueueEntry("unreachable", "type4", "4.4.4.4"), } - now := time.Now().UTC() - recent := now.Add(-30 * time.Second) - stale := now.Add(-3000 * time.Second) rebootEntries := []*cke.RebootQueueEntry{ nil, - {Node: "1.1.1.1", Status: cke.RebootStatusRebooting, LastTransitionTime: recent}, - {Node: "2.2.2.2", Status: cke.RebootStatusRebooting, LastTransitionTime: recent}, - {Node: "3.3.3.3", Status: cke.RebootStatusRebooting, LastTransitionTime: stale}, - {Node: "4.4.4.4", Status: cke.RebootStatusDraining, LastTransitionTime: recent}, + {Node: "1.1.1.1", Status: cke.RebootStatusRebooting}, + {Node: "2.2.2.2", Status: cke.RebootStatusRebooting}, + {Node: "3.3.3.3", Status: cke.RebootStatusRebooting}, + {Node: "4.4.4.4", Status: cke.RebootStatusDraining}, + } + + nodeStatuses := map[string]*cke.NodeStatus{ + "1.1.1.1": nil, + "2.2.2.2": nil, + "3.3.3.3": nil, + "4.4.4.4": nil, } tests := []struct { @@ -47,6 +50,7 @@ func TestRepairer(t *testing.T) { failedMachines []Machine queuedEntries []*cke.RepairQueueEntry rebootEntries []*cke.RebootQueueEntry + nodeStatuses map[string]*cke.NodeStatus expectedEntries []*cke.RepairQueueEntry }{ { @@ -54,6 +58,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{}, queuedEntries: []*cke.RepairQueueEntry{entries[2]}, rebootEntries: nil, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{}, }, { @@ -61,6 +66,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[1]}, queuedEntries: []*cke.RepairQueueEntry{entries[2]}, rebootEntries: nil, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[1]}, }, { @@ -68,6 +74,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[0], machines[1]}, queuedEntries: []*cke.RepairQueueEntry{entries[2]}, rebootEntries: nil, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[1]}, }, { @@ -75,6 +82,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[1], machines[2], machines[3]}, queuedEntries: []*cke.RepairQueueEntry{entries[2]}, rebootEntries: nil, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]}, }, { @@ -82,6 +90,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[1], machines[2], machines[3]}, queuedEntries: []*cke.RepairQueueEntry{cke.NewRepairQueueEntry("unreachable", "type2", "2.2.2.2")}, rebootEntries: nil, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]}, }, { @@ -89,6 +98,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[1], machines[2], machines[3]}, queuedEntries: []*cke.RepairQueueEntry{entries[2], entries[4]}, rebootEntries: nil, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{}, }, { @@ -96,6 +106,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[1]}, queuedEntries: []*cke.RepairQueueEntry{}, rebootEntries: []*cke.RebootQueueEntry{rebootEntries[1]}, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{}, }, { @@ -103,6 +114,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[2]}, queuedEntries: []*cke.RepairQueueEntry{}, rebootEntries: []*cke.RebootQueueEntry{rebootEntries[2]}, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[2]}, }, { @@ -110,6 +122,7 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[3]}, queuedEntries: []*cke.RepairQueueEntry{}, rebootEntries: []*cke.RebootQueueEntry{rebootEntries[3]}, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[3]}, }, { @@ -117,8 +130,33 @@ func TestRepairer(t *testing.T) { failedMachines: []Machine{machines[4]}, queuedEntries: []*cke.RepairQueueEntry{}, rebootEntries: []*cke.RebootQueueEntry{rebootEntries[4]}, + nodeStatuses: nodeStatuses, expectedEntries: []*cke.RepairQueueEntry{entries[4]}, }, + { + name: "IgnoreOutOfClusterUnreachableMachine", + failedMachines: []Machine{machines[1]}, + queuedEntries: []*cke.RepairQueueEntry{}, + rebootEntries: []*cke.RebootQueueEntry{rebootEntries[1]}, + nodeStatuses: nil, + expectedEntries: []*cke.RepairQueueEntry{}, + }, + { + name: "OutOfClusterButUnhealthy", + failedMachines: []Machine{machines[2]}, + queuedEntries: []*cke.RepairQueueEntry{}, + rebootEntries: []*cke.RebootQueueEntry{rebootEntries[2]}, + nodeStatuses: nil, + expectedEntries: []*cke.RepairQueueEntry{entries[2]}, + }, + { + name: "OutOfClusterButStale", + failedMachines: []Machine{machines[3]}, + queuedEntries: []*cke.RepairQueueEntry{}, + rebootEntries: []*cke.RebootQueueEntry{rebootEntries[3]}, + nodeStatuses: nil, + expectedEntries: []*cke.RepairQueueEntry{entries[3]}, + }, } for _, tt := range tests { @@ -126,7 +164,7 @@ func TestRepairer(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - entries := Repairer(tt.failedMachines, tt.queuedEntries, tt.rebootEntries, constraints, now) + entries := Repairer(tt.failedMachines, tt.queuedEntries, tt.rebootEntries, tt.nodeStatuses, constraints) if !cmp.Equal(entries, tt.expectedEntries, cmpopts.EquateEmpty()) { t.Errorf("!cmp.Equal(entries, tt.newEntries), actual: %v, expected: %v", entries, tt.expectedEntries) }