diff --git a/go-chaos/internal/helper_test.go b/go-chaos/internal/helper_test.go index efe6c60a2..51d392aea 100644 --- a/go-chaos/internal/helper_test.go +++ b/go-chaos/internal/helper_test.go @@ -79,11 +79,40 @@ func (c K8Client) CreatePodWithLabelsAndName(t *testing.T, selector *metav1.Labe require.NoError(t, err) } +func (c K8Client) CreateReadyPodWithLabelsAndName(t *testing.T, selector *metav1.LabelSelector, podName string) { + _, err := c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).Create(context.TODO(), &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Labels: selector.MatchLabels, Name: podName}, + Spec: v1.PodSpec{}, + }, metav1.CreateOptions{}) + + require.NoError(t, err) +} + +func (c K8Client) CreateBrokerPodsWithStatus(t *testing.T, selector *metav1.LabelSelector, podName string, podPhase v1.PodPhase, readyStatus bool) { + _, err := c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).Create(context.TODO(), &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Labels: selector.MatchLabels, Name: podName}, + Spec: v1.PodSpec{}, + Status: v1.PodStatus{ + Phase: podPhase, + ContainerStatuses: []v1.ContainerStatus{ + { + Ready: readyStatus, + }, + }, + }, + }, metav1.CreateOptions{}) + + require.NoError(t, err) +} + func (c K8Client) CreateDeploymentWithLabelsAndName(t *testing.T, selector *metav1.LabelSelector, name string) { _, err := c.Clientset.AppsV1().Deployments(c.GetCurrentNamespace()).Create(context.TODO(), &v12.Deployment{ ObjectMeta: metav1.ObjectMeta{Labels: selector.MatchLabels, Name: name}, Spec: v12.DeploymentSpec{}, - Status: v12.DeploymentStatus{}, + Status: v12.DeploymentStatus{ + UnavailableReplicas: 0, + AvailableReplicas: 1, + }, }, metav1.CreateOptions{}) require.NoError(t, err) diff --git a/go-chaos/internal/pods.go b/go-chaos/internal/pods.go index d115080d8..a2eaf6901 100644 --- a/go-chaos/internal/pods.go +++ b/go-chaos/internal/pods.go @@ -121,32 +121,33 @@ func (c K8Client) RestartPodWithGracePeriod(podName string, gracePeriodSec *int6 } func (c K8Client) AwaitReadiness() error { - retries := 0 - maxRetries := 300 // 5 * 60s - for { - if retries >= maxRetries { - return errors.New("Awaited readiness of pods in namespace %s, but timed out after 30s") - } - if retries > 0 { - time.Sleep(1 * time.Second) - } + return c.AwaitReadinessWithTimeout(5 * time.Minute) +} - brokersAreRunning, err := c.checkIfBrokersAreRunning() - if err != nil { - return err - } +func (c K8Client) AwaitReadinessWithTimeout(timeout time.Duration) error { + timedOut := time.After(timeout) + ticker := time.Tick(1 * time.Second) - gatewaysAreRunning, err := c.checkIfGatewaysAreRunning() - if err != nil { - return err - } + // Keep checking until we're timed out + for { + select { + case <-timedOut: + return errors.New(fmt.Sprintf("Awaited readiness of pods in namespace %v, but timed out after %v", c.GetCurrentNamespace(), timeout)) + case <-ticker: + brokersAreRunning, err := c.checkIfBrokersAreRunning() + if err != nil { + LogVerbose("Failed to check broker status. Will retry. %v", err) + } + gatewaysAreRunning, err := c.checkIfGatewaysAreRunning() + if err != nil { + LogVerbose("Failed to check gateway status. Will retry. %v", err) + } - if brokersAreRunning && gatewaysAreRunning { - break + if brokersAreRunning && gatewaysAreRunning { + return nil + } } - retries++ } - return nil } func (c K8Client) checkIfBrokersAreRunning() (bool, error) { @@ -161,8 +162,8 @@ func (c K8Client) checkIfBrokersAreRunning() (bool, error) { allRunning := true for _, pod := range pods.Items { - if !pod.Status.ContainerStatuses[0].Ready { // assuming there is only one container - LogVerbose("Pod %s is in phase %s, but not ready. Wait for some seconds.", pod.Name, pod.Status.Phase) + if pod.Status.Phase != v1.PodRunning || !pod.Status.ContainerStatuses[0].Ready { // assuming there is only one container + LogVerbose("Pod %s is in phase %s, and not ready. Wait for some seconds.", pod.Name, pod.Status.Phase) allRunning = false break } @@ -198,7 +199,7 @@ func (c K8Client) AwaitPodReadiness(podName string, timeout time.Duration) error pod, err := c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).Get(context.TODO(), podName, metav1.GetOptions{}) if err != nil { LogVerbose("Failed to get pod %s. Will retry", pod.Name) - } else if pod.Status.ContainerStatuses[0].Ready { // assuming there is only one container + } else if pod.Status.Phase == v1.PodRunning && pod.Status.ContainerStatuses[0].Ready { // assuming there is only one container return nil } else { LogVerbose("Pod %s is in phase %s, but not ready. Wait for some seconds", pod.Name, pod.Status.Phase) diff --git a/go-chaos/internal/pods_test.go b/go-chaos/internal/pods_test.go index 8d8d1d5dd..e6fd5d929 100644 --- a/go-chaos/internal/pods_test.go +++ b/go-chaos/internal/pods_test.go @@ -15,7 +15,9 @@ package internal import ( + v1 "k8s.io/api/core/v1" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -265,3 +267,66 @@ func Test_GetEmbeddedGateway(t *testing.T) { require.NotEmpty(t, names) assert.Equal(t, "broker", names[0], "Expected to retrieve broker") } + +func Test_ShouldReturnTrueWhenBrokersAreReady(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // broker + selector, err := metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreateBrokerPodsWithStatus(t, selector, "broker1", v1.PodRunning, true) + k8Client.CreateBrokerPodsWithStatus(t, selector, "broker2", v1.PodRunning, true) + + gatewaySelector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) + require.NoError(t, err) + k8Client.CreateDeploymentWithLabelsAndName(t, gatewaySelector, "gateway") + + // when + err = k8Client.AwaitReadinessWithTimeout(2 * time.Second) + + // then + require.NoError(t, err) +} + +func Test_ShouldReturnErrorWhenAtleastOneBrokerIsNotReady(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // broker + selector, err := metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreateBrokerPodsWithStatus(t, selector, "broker1", v1.PodRunning, true) + k8Client.CreateBrokerPodsWithStatus(t, selector, "broker2", v1.PodRunning, false) + + gatewaySelector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) + require.NoError(t, err) + k8Client.CreateDeploymentWithLabelsAndName(t, gatewaySelector, "gateway") + + // when + err = k8Client.AwaitReadinessWithTimeout(2 * time.Second) + + // then + require.Error(t, err) +} + +func Test_ShouldReturnErrorWhenAtleastOneBrokerIsNotRunning(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // broker + selector, err := metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreateBrokerPodsWithStatus(t, selector, "broker1", v1.PodRunning, true) + k8Client.CreateBrokerPodsWithStatus(t, selector, "broker2", v1.PodPending, true) + + gatewaySelector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) + require.NoError(t, err) + k8Client.CreateDeploymentWithLabelsAndName(t, gatewaySelector, "gateway") + + // when + err = k8Client.AwaitReadinessWithTimeout(2 * time.Second) + + // then + require.Error(t, err) +}