diff --git a/connectivity/check/deployment.go b/connectivity/check/deployment.go index 1a7f908342..3614dd0ade 100644 --- a/connectivity/check/deployment.go +++ b/connectivity/check/deployment.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "time" "github.com/cilium/cilium-cli/defaults" @@ -445,6 +446,17 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error { } } + if ct.params.MultiCluster == "" { + for _, ciliumPod := range ct.ciliumPods { + hostIP := ciliumPod.Pod.Status.HostIP + for _, s := range ct.echoServices { + if err := ct.waitForNodePorts(ctx, hostIP, s); err != nil { + return err + } + } + } + } + for _, client := range ct.clients.clients() { externalWorkloads, err := client.ListCiliumExternalWorkloads(ctx, metav1.ListOptions{}) if err != nil { @@ -617,6 +629,44 @@ func (ct *ConnectivityTest) waitForService(ctx context.Context, service Service) } } +// waitForNodePorts waits until all the nodeports in a service are available on a given node. +func (ct *ConnectivityTest) waitForNodePorts(ctx context.Context, nodeIP string, service Service) error { + pod := ct.RandomClientPod() + if pod == nil { + return fmt.Errorf("no client pod available") + } + ctx, cancel := context.WithTimeout(ctx, ct.params.serviceReadyTimeout()) + defer cancel() + + for _, port := range service.Service.Spec.Ports { + nodePort := port.NodePort + if nodePort == 0 { + continue + } + ct.Logf("⌛ [%s] Waiting for NodePort %s:%d (%s) to become ready...", + ct.client.ClusterName(), nodeIP, nodePort, service.Name()) + for { + // Warning: ExecInPodWithStderr ignores ctx. Don't pass it here so we don't + // falsely expect the function to be able to be cancelled. + _, e, err := ct.client.ExecInPodWithStderr(context.TODO(), + pod.Pod.Namespace, pod.Pod.Name, pod.Pod.Labels["name"], + []string{"nc", "-w", "3", "-z", nodeIP, strconv.Itoa(int(nodePort))}) + if err == nil { + break + } + + ct.Debugf("Error waiting for NodePort %s:%d (%s): %s: %s", nodeIP, nodePort, service.Name(), err, e.String()) + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout reached waiting for NodePort %s:%d (%s)", nodeIP, nodePort, service.Name()) + case <-time.After(time.Second): + } + } + } + return nil +} + func (ct *ConnectivityTest) waitForCiliumEndpoint(ctx context.Context, client *k8s.Client, namespace, name string) error { ct.Logf("⌛ [%s] Waiting for CiliumEndpoint for pod %s/%s to appear...", client.ClusterName(), namespace, name) for {