Skip to content

Commit

Permalink
Merge pull request kubernetes#15934 from hakman/optimize_test_cleanup
Browse files Browse the repository at this point in the history
scale-test: Optimise node dumping for large clusters
  • Loading branch information
k8s-ci-robot authored Sep 20, 2023
2 parents 6dd35e2 + 5c78560 commit 071b285
Showing 1 changed file with 74 additions and 23 deletions.
97 changes: 74 additions & 23 deletions pkg/dump/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"k8s.io/klog/v2"
)

const (
// MaxNodesToDump is the maximum number of nodes to dump
MaxNodesToDump = 500
)

// logDumper gets all the nodes from a kubernetes cluster and dumps a well-known set of logs
type logDumper struct {
sshClientFactory sshClientFactory
Expand Down Expand Up @@ -102,44 +107,61 @@ func NewLogDumper(clusterName string, sshConfig *ssh.ClientConfig, keyRing agent
// This allows for dumping log on nodes even if they don't register as a kubernetes
// node, or if a node fails to register, or if the whole cluster fails to start.
func (d *logDumper) DumpAllNodes(ctx context.Context, nodes corev1.NodeList, additionalIPs, additionalPrivateIPs []string) error {
var dumped []*corev1.Node
var special, regular, dumped []*corev1.Node

for i := range nodes.Items {
if ctx.Err() != nil {
log.Printf("stopping dumping nodes: %v", ctx.Err())
return ctx.Err()
}

node := &nodes.Items[i]

ip := ""
privateIP := ""
for _, address := range node.Status.Addresses {
if address.Type == "ExternalIP" {
ip = address.Address
break
} else if address.Type == "InternalIP" {
if privateIP == "" {
privateIP = address.Address
}
}
if _, ok := node.Labels["node-role.kubernetes.io/master"]; ok {
special = append(special, node)
continue
}
if _, ok := node.Labels["node-role.kubernetes.io/control-plane"]; ok {
special = append(special, node)
continue
}
if _, ok := node.Labels["node-role.kubernetes.io/api-server"]; ok {
special = append(special, node)
continue
}

var err error
if ip != "" {
err = d.dumpNode(ctx, node.Name, ip, false)
regular = append(regular, node)
}

for i := range special {
node := special[i]
err := d.dumpRegistered(ctx, node)
if err != nil {
log.Printf("could not dump node %s: %v", node.Name, err)
} else {
err = d.dumpNode(ctx, node.Name, privateIP, true)
dumped = append(dumped, node)
}
}
// If none of the special nodes was successfully dumped, there is no value in dumping the rest
if len(dumped) == 0 {
return fmt.Errorf("could not dump any special node")
}

for i := range regular {
if len(dumped) >= MaxNodesToDump {
log.Printf("stopping dumping nodes: %d nodes dumped", MaxNodesToDump)
return nil
}
node := regular[i]
err := d.dumpRegistered(ctx, node)
if err != nil {
log.Printf("could not dump node %s (%s): %v", node.Name, ip, err)
log.Printf("could not dump node %s: %v", node.Name, err)
} else {
dumped = append(dumped, node)
}
}

notDumped := findInstancesNotDumped(additionalIPs, dumped)
for _, ip := range notDumped {
if len(dumped) >= MaxNodesToDump {
log.Printf("stopping dumping nodes: %d nodes dumped", MaxNodesToDump)
return nil
}
err := d.dumpNotRegistered(ctx, ip, false)
if err != nil {
return err
Expand All @@ -148,6 +170,10 @@ func (d *logDumper) DumpAllNodes(ctx context.Context, nodes corev1.NodeList, add

notDumped = findInstancesNotDumped(additionalPrivateIPs, dumped)
for _, ip := range notDumped {
if len(dumped) >= MaxNodesToDump {
log.Printf("stopping dumping nodes: %d nodes dumped", MaxNodesToDump)
return nil
}
err := d.dumpNotRegistered(ctx, ip, true)
if err != nil {
return err
Expand All @@ -157,6 +183,31 @@ func (d *logDumper) DumpAllNodes(ctx context.Context, nodes corev1.NodeList, add
return nil
}

func (d *logDumper) dumpRegistered(ctx context.Context, node *corev1.Node) error {
if ctx.Err() != nil {
log.Printf("stopping dumping nodes: %v", ctx.Err())
return ctx.Err()
}

var publicIP, privateIP string
for _, address := range node.Status.Addresses {
if address.Type == "ExternalIP" {
publicIP = address.Address
break
} else if address.Type == "InternalIP" {
if privateIP == "" {
privateIP = address.Address
}
}
}

if publicIP != "" {
return d.dumpNode(ctx, node.Name, publicIP, false)
} else {
return d.dumpNode(ctx, node.Name, privateIP, true)
}
}

func (d *logDumper) dumpNotRegistered(ctx context.Context, ip string, useBastion bool) error {
if ctx.Err() != nil {
log.Printf("stopping dumping nodes: %v", ctx.Err())
Expand Down Expand Up @@ -489,7 +540,7 @@ func (f *sshClientFactoryImplementation) Dial(ctx context.Context, host string,
}
addr = net.JoinHostPort(addr, "22")
d := net.Dialer{
Timeout: 15 * time.Second,
Timeout: 5 * time.Second,
}
conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
Expand Down

0 comments on commit 071b285

Please sign in to comment.