Skip to content

Commit

Permalink
Add target status integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII committed Oct 4, 2023
1 parent 3955f2a commit f58d7f8
Show file tree
Hide file tree
Showing 39 changed files with 5,873 additions and 65 deletions.
213 changes: 161 additions & 52 deletions e2e/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestCollector(t *testing.T) {
t.Run("deployed", tctx.subtest(testCollectorDeployed))
t.Run("self-podmonitoring", tctx.subtest(testCollectorSelfPodMonitoring))
t.Run("self-clusterpodmonitoring", tctx.subtest(testCollectorSelfClusterPodMonitoring))
t.Run("target-status", tctx.subtest(testCollectorTargetStatus))
t.Run("scrape-kubelet", tctx.subtest(testCollectorScrapeKubelet))
}

Expand Down Expand Up @@ -148,6 +149,60 @@ func testCollectorDeployed(ctx context.Context, t *OperatorContext) {
}
}

func selfScrapeEndpointConfig() []monitoringv1.ScrapeEndpoint {
return []monitoringv1.ScrapeEndpoint{
{
Port: intstr.FromString(operator.CollectorPrometheusContainerPortName),
Interval: "5s",
},
{
Port: intstr.FromString(operator.CollectorConfigReloaderContainerPortName),
Interval: "5s",
},
}
}

func checkStatusConditions(status *monitoringv1.PodMonitoringStatus, expected int) error {
if size := len(status.Conditions); size == 0 {
return errors.New("empty conditions")
} else if size != expected {
return fmt.Errorf("expected %d conditions, but got: %d", expected, size)
}

for _, condition := range status.Conditions {
if condition.Type != monitoringv1.ConfigurationCreateSuccess {
return fmt.Errorf("condition is not successful: %s", condition.Type)
}
}
return nil
}

func checkStatusEndpoints(status *monitoringv1.PodMonitoringStatus, expected int) error {
endpointStatuses := status.EndpointStatuses
if size := len(endpointStatuses); size == 0 {
return errors.New("empty endpoint status")
} else if size != expected {
return fmt.Errorf("expected %d endpoint, but got: %d", expected, size)
}

for _, status := range endpointStatuses {
var err error
if status.UnhealthyTargets != 0 {
err = fmt.Errorf("unhealthy targets: %d", status.UnhealthyTargets)
} else if status.CollectorsFraction != "1" {
err = fmt.Errorf("collectors failed: %s", status.CollectorsFraction)
} else if len(status.SampleGroups) == 0 {
err = errors.New("missing sample groups")
} else if len(status.SampleGroups[0].SampleTargets) == 0 {
err = fmt.Errorf("missing sample targets: %d", status.SampleGroups[0].Count)
}
if err != nil {
return fmt.Errorf("unhealthy endpoint status %q: %w", status.Name, err)
}
}
return nil
}

// testCollectorSelfPodMonitoring sets up pod monitoring of the collector itself
// and waits for samples to become available in Cloud Monitoring.
func testCollectorSelfPodMonitoring(ctx context.Context, t *OperatorContext) {
Expand All @@ -165,16 +220,7 @@ func testCollectorSelfPodMonitoring(ctx context.Context, t *OperatorContext) {
operator.LabelAppName: operator.NameCollector,
},
},
Endpoints: []monitoringv1.ScrapeEndpoint{
{
Port: intstr.FromString(operator.CollectorPrometheusContainerPortName),
Interval: "5s",
},
{
Port: intstr.FromString(operator.CollectorConfigReloaderContainerPortName),
Interval: "5s",
},
},
Endpoints: selfScrapeEndpointConfig(),
},
}

Expand All @@ -183,30 +229,33 @@ func testCollectorSelfPodMonitoring(ctx context.Context, t *OperatorContext) {
}
t.Logf("Waiting for PodMonitoring %q to be processed", name)

var resVer = ""
err := wait.Poll(time.Second, 1*time.Minute, func() (bool, error) {
if err := t.Client().Get(ctx, client.ObjectKeyFromObject(pm), pm); err != nil {
resVer := ""
var err error
pollErr := wait.Poll(time.Second, 1*time.Minute, func() (bool, error) {
if err = t.Client().Get(ctx, client.ObjectKeyFromObject(pm), pm); err != nil {
return false, fmt.Errorf("getting PodMonitoring failed: %w", err)
}
// Ensure no status update cycles.
// This is not a perfect check as it's possible the get call returns before the operator
// would sync again, however it can serve as a valuable guardrail in case sporadic test
// failures start happening due to update cycles.
if size := len(pm.Status.Conditions); size == 1 {
if resVer == "" {
resVer = pm.ResourceVersion
return false, nil
}
success := pm.Status.Conditions[0].Type == monitoringv1.ConfigurationCreateSuccess
steadyVer := resVer == pm.ResourceVersion
return success && steadyVer, nil
} else if size > 1 {
return false, fmt.Errorf("status conditions should be of length 1, but got: %d", size)
if resVer != pm.ResourceVersion {
resVer = pm.ResourceVersion
err = errors.New("waiting for resource version to stabilize")
return false, nil
}

if err = checkStatusConditions(&pm.Status, 1); err != nil {
return false, nil
}
return false, nil
return true, nil
})
if err != nil {
t.Errorf("unable to validate PodMonitoring status: %s", err)
if pollErr != nil {
if errors.Is(pollErr, wait.ErrWaitTimeout) && err != nil {
t.Errorf("unable to validate status: %s", err)
} else {
t.Error("unable to validate status due to timeout")
}
}

if !skipGCM {
Expand All @@ -231,16 +280,7 @@ func testCollectorSelfClusterPodMonitoring(ctx context.Context, t *OperatorConte
operator.LabelAppName: operator.NameCollector,
},
},
Endpoints: []monitoringv1.ScrapeEndpoint{
{
Port: intstr.FromString(operator.CollectorPrometheusContainerPortName),
Interval: "5s",
},
{
Port: intstr.FromString(operator.CollectorConfigReloaderContainerPortName),
Interval: "5s",
},
},
Endpoints: selfScrapeEndpointConfig(),
},
}

Expand All @@ -249,30 +289,33 @@ func testCollectorSelfClusterPodMonitoring(ctx context.Context, t *OperatorConte
}
t.Logf("Waiting for ClusterPodMonitoring %q to be processed", name)

var resVer = ""
err := wait.Poll(time.Second, 1*time.Minute, func() (bool, error) {
if err := t.Client().Get(ctx, client.ObjectKeyFromObject(pm), pm); err != nil {
resVer := ""
var err error
pollErr := wait.Poll(time.Second, 1*time.Minute, func() (bool, error) {
if err = t.Client().Get(ctx, client.ObjectKeyFromObject(pm), pm); err != nil {
return false, fmt.Errorf("getting ClusterPodMonitoring failed: %w", err)
}
// Ensure no status update cycles.
// This is not a perfect check as it's possible the get call returns before the operator
// would sync again, however it can serve as a valuable guardrail in case sporadic test
// failures start happening due to update cycles.
if size := len(pm.Status.Conditions); size == 1 {
if resVer == "" {
resVer = pm.ResourceVersion
return false, nil
}
success := pm.Status.Conditions[0].Type == monitoringv1.ConfigurationCreateSuccess
steadyVer := resVer == pm.ResourceVersion
return success && steadyVer, nil
} else if size > 1 {
return false, fmt.Errorf("status conditions should be of length 1, but got: %d", size)
if resVer != pm.ResourceVersion {
resVer = pm.ResourceVersion
err = errors.New("waiting for resource version to stabilize")
return false, nil
}

if err = checkStatusConditions(&pm.Status, 1); err != nil {
return false, nil
}
return false, nil
return true, nil
})
if err != nil {
t.Errorf("unable to validate ClusterPodMonitoring status: %s", err)
if pollErr != nil {
if errors.Is(pollErr, wait.ErrWaitTimeout) && err != nil {
t.Errorf("unable to validate status: %s", err)
} else {
t.Error("unable to validate status due to timeout")
}
}

if !skipGCM {
Expand All @@ -281,6 +324,72 @@ func testCollectorSelfClusterPodMonitoring(ctx context.Context, t *OperatorConte
}
}

// testCollectorTargetStatus sets up pod monitoring of the collector itself and
// checks target status.
func testCollectorTargetStatus(ctx context.Context, t *OperatorContext) {
t.createOperatorConfigFrom(ctx, monitoringv1.OperatorConfig{
Features: monitoringv1.OperatorFeatures{
TargetStatus: monitoringv1.TargetStatusSpec{
Enabled: true,
},
},
})

name := "collector-podmon-target-status"
pm := &monitoringv1.PodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: t.namespace,
},
Spec: monitoringv1.PodMonitoringSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
operator.LabelAppName: operator.NameCollector,
},
},
Endpoints: selfScrapeEndpointConfig(),
},
}

if err := t.Client().Create(ctx, pm); err != nil {
t.Fatalf("create collector PodMonitoring: %s", err)
}
t.Logf("Waiting for PodMonitoring %q to be processed", name)

resVer := ""
var err error
pollErr := wait.Poll(time.Second, 2*time.Minute, func() (bool, error) {
if err = t.Client().Get(ctx, client.ObjectKeyFromObject(pm), pm); err != nil {
return false, fmt.Errorf("getting PodMonitoring failed: %w", err)
}

// Ensure no status update cycles.
// This is not a perfect check as it's possible the get call returns before the operator
// would sync again, however it can serve as a valuable guardrail in case sporadic test
// failures start happening due to update cycles.
if resVer != pm.ResourceVersion {
resVer = pm.ResourceVersion
err = errors.New("waiting for resource version to stabilize")
return false, nil
}

if err = checkStatusConditions(&pm.Status, 1); err != nil {
return false, nil
}
if err = checkStatusEndpoints(&pm.Status, len(selfScrapeEndpointConfig())); err != nil {
return false, nil
}
return true, nil
})
if pollErr != nil {
if errors.Is(pollErr, wait.ErrWaitTimeout) && err != nil {
t.Errorf("unable to validate status: %s", err)
} else {
t.Error("unable to validate status due to timeout")
}
}
}

// validateCollectorUpMetrics checks whether the scrape-time up metrics for all collector
// pods can be queried from GCM.
func validateCollectorUpMetrics(ctx context.Context, t *OperatorContext, job string) {
Expand Down
Loading

0 comments on commit f58d7f8

Please sign in to comment.