Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tmpnet] Refactor bootstrap monitor kubernetes functions for reuse #3446

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 2 additions & 49 deletions tests/fixture/bootstrapmonitor/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/ava-labs/avalanchego/tests/fixture/tmpnet"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/version"

Expand All @@ -39,54 +40,6 @@ type bootstrapTestDetails struct {
StartTime time.Time `json:"startTime"`
}

// WaitForPodCondition watches the specified pod until the status includes the specified condition.
func WaitForPodCondition(ctx context.Context, clientset *kubernetes.Clientset, namespace string, podName string, conditionType corev1.PodConditionType) error {
return waitForPodStatus(
ctx,
clientset,
namespace,
podName,
func(status *corev1.PodStatus) bool {
for _, condition := range status.Conditions {
if condition.Type == conditionType && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
},
)
}

// waitForPodStatus watches the specified pod until the status is deemed acceptable by the provided test function.
func waitForPodStatus(
ctx context.Context,
clientset *kubernetes.Clientset,
namespace string,
name string,
acceptable func(*corev1.PodStatus) bool,
) error {
watch, err := clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name}))
if err != nil {
return fmt.Errorf("failed to initiate watch of pod %s/%s: %w", namespace, name, err)
}

for {
select {
case event := <-watch.ResultChan():
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}

if acceptable(&pod.Status) {
return nil
}
case <-ctx.Done():
return fmt.Errorf("timeout waiting for pod readiness: %w", ctx.Err())
}
}
}

// setImageDetails updates the pod's owning statefulset with the image of the specified container and associated version details
func setImageDetails(ctx context.Context, log logging.Logger, clientset *kubernetes.Clientset, namespace string, podName string, imageDetails *ImageDetails) error {
// Determine the name of the statefulset to update
Expand Down Expand Up @@ -212,7 +165,7 @@ func getLatestImageDetails(
}
qualifiedPodName := fmt.Sprintf("%s.%s", namespace, createdPod.Name)

err = waitForPodStatus(ctx, clientset, namespace, createdPod.Name, func(status *corev1.PodStatus) bool {
err = tmpnet.WaitForPodStatus(ctx, clientset, namespace, createdPod.Name, func(status *corev1.PodStatus) bool {
return status.Phase == corev1.PodSucceeded || status.Phase == corev1.PodFailed
})
if err != nil {
Expand Down
224 changes: 31 additions & 193 deletions tests/fixture/bootstrapmonitor/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"bufio"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
Expand All @@ -18,23 +15,16 @@ import (

"github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/utils/pointer"

"github.com/ava-labs/avalanchego/api/info"
"github.com/ava-labs/avalanchego/config"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/tests"
"github.com/ava-labs/avalanchego/tests/fixture/bootstrapmonitor"
"github.com/ava-labs/avalanchego/tests/fixture/e2e"
"github.com/ava-labs/avalanchego/tests/fixture/tmpnet"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -130,7 +120,7 @@ var _ = ginkgo.Describe("[Bootstrap Tester]", func() {
ginkgo.By(fmt.Sprintf("Created namespace %q", namespace))

ginkgo.By("Creating a node to bootstrap from")
nodeStatefulSet := newNodeStatefulSet("avalanchego-node", defaultNodeFlags())
nodeStatefulSet := newNodeStatefulSet("avalanchego-node", defaultPodFlags())
createdNodeStatefulSet, err := clientset.AppsV1().StatefulSets(namespace).Create(tc.DefaultContext(), nodeStatefulSet, metav1.CreateOptions{})
require.NoError(err)
nodePodName := createdNodeStatefulSet.Name + "-0"
Expand Down Expand Up @@ -257,205 +247,53 @@ func buildImage(tc tests.TestContext, imageName string, forceNewHash bool, scrip
require.NoError(err, "Image build failed: %s", output)
}

// newNodeStatefulSet returns a statefulset for an avalanchego node.
func newNodeStatefulSet(name string, flags map[string]string) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name + "-",
},
Spec: appsv1.StatefulSetSpec{
Replicas: pointer.Int32(1),
ServiceName: name,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(volumeSize),
},
},
},
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": name,
},
Annotations: map[string]string{
// This needs to be present to ensure compatibility with json patch replace
bootstrapmonitor.VersionsAnnotationKey: "",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: nodeContainerName,
Image: latestAvalanchegoImage,
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: config.DefaultHTTPPort,
},
{
Name: "staker",
ContainerPort: config.DefaultStakingPort,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: volumeName,
MountPath: nodeDataDir,
},
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/ext/health/liveness",
Port: intstr.FromInt(config.DefaultHTTPPort),
},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
},
Env: stringMapToEnvVarSlice(flags),
},
},
},
},
},
}
}
statefulSet := tmpnet.NewNodeStatefulSet(
name,
latestAvalanchegoImage,
nodeContainerName,
volumeName,
volumeSize,
nodeDataDir,
flags,
)

// stringMapToEnvVarSlice converts a string map to a kube EnvVar slice.
func stringMapToEnvVarSlice(mapping map[string]string) []corev1.EnvVar {
envVars := make([]corev1.EnvVar, len(mapping))
var i int
for k, v := range mapping {
envVars[i] = corev1.EnvVar{
Name: config.EnvVarName(config.EnvPrefix, k),
Value: v,
}
i++
// The version annotations key needs to be present to ensure compatibility with json patch replace
if statefulSet.Spec.Template.Annotations == nil {
statefulSet.Spec.Template.Annotations = map[string]string{}
}
return envVars
statefulSet.Spec.Template.Annotations[bootstrapmonitor.VersionsAnnotationKey] = ""

return statefulSet
}

// defaultNodeFlags defines common flags for avalanchego nodes used by this test
func defaultNodeFlags() map[string]string {
return map[string]string{
config.DataDirKey: nodeDataDir,
config.NetworkNameKey: constants.LocalName,
config.SybilProtectionEnabledKey: "false",
config.HealthCheckFreqKey: "500ms", // Ensure rapid detection of a healthy state
config.LogDisplayLevelKey: logging.Debug.String(),
config.LogLevelKey: logging.Debug.String(),
config.HTTPHostKey: "0.0.0.0", // Need to bind to pod IP to ensure kubelet can access the http port for the readiness check
}
func defaultPodFlags() map[string]string {
return tmpnet.DefaultPodFlags(constants.LocalName, nodeDataDir)
}

// waitForPodCondition waits until the specified pod reports the specified condition
func waitForPodCondition(tc tests.TestContext, clientset *kubernetes.Clientset, namespace string, podName string, conditionType corev1.PodConditionType) {
require.NoError(tc, bootstrapmonitor.WaitForPodCondition(tc.DefaultContext(), clientset, namespace, podName, conditionType))
require.NoError(tc, tmpnet.WaitForPodCondition(tc.DefaultContext(), clientset, namespace, podName, conditionType))
}

// waitForNodeHealthy waits for the node running in the specified pod to report healthy.
func waitForNodeHealthy(tc tests.TestContext, kubeconfig *restclient.Config, namespace string, podName string) ids.NodeID {
require := require.New(tc)

// A forwarded connection enables connectivity without exposing the node external to the kube cluster
ginkgo.By(fmt.Sprintf("Enabling a local forward for pod %s.%s", namespace, podName))
localPort, localPortStopChan, err := enableLocalForwardForPod(kubeconfig, namespace, podName, config.DefaultHTTPPort, ginkgo.GinkgoWriter, ginkgo.GinkgoWriter)
require.NoError(err)
defer close(localPortStopChan)
localNodeURI := fmt.Sprintf("http://127.0.0.1:%d", localPort)

infoClient := info.NewClient(localNodeURI)
bootstrapNodeID, _, err := infoClient.GetNodeID(tc.DefaultContext())
require.NoError(err)

ginkgo.By(fmt.Sprintf("Waiting for pod %s.%s to report a healthy status at %s", namespace, podName, localNodeURI))
require.Eventually(func() bool {
healthReply, err := tmpnet.CheckNodeHealth(tc.DefaultContext(), localNodeURI)
if err != nil {
tc.Outf("Error checking node health: %v\n", err)
return false
}
return healthReply.Healthy
}, e2e.DefaultTimeout, e2e.DefaultPollingInterval)

return bootstrapNodeID
}

// enableLocalForwardForPod enables traffic forwarding from a local port to the specified pod with client-go. The returned
// stop channel should be closed to stop the port forwarding.
func enableLocalForwardForPod(kubeconfig *restclient.Config, namespace string, name string, port int, out, errOut io.Writer) (uint16, chan struct{}, error) {
transport, upgrader, err := spdy.RoundTripperFor(kubeconfig)
if err != nil {
return 0, nil, fmt.Errorf("failed to create round tripper: %w", err)
}

dialer := spdy.NewDialer(
upgrader,
&http.Client{
Transport: transport,
},
http.MethodPost,
&url.URL{
Scheme: "https",
Path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, name),
Host: strings.TrimPrefix(kubeconfig.Host, "https://"),
},
nodeID, err := tmpnet.WaitForNodeHealthy(
tc.DefaultContext(),
tc.Outf,
kubeconfig,
namespace,
podName,
e2e.DefaultPollingInterval,
ginkgo.GinkgoWriter,
ginkgo.GinkgoWriter,
)
ports := []string{fmt.Sprintf("0:%d", port)}

// Need to specify 127.0.0.1 to ensure that forwarding is only attempted for the ipv4
// address of the pod. By default, kind is deployed with only ipv4, and attempting to
// connect to a pod with ipv6 will fail.
addresses := []string{"127.0.0.1"}

stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
forwarder, err := portforward.NewOnAddresses(dialer, addresses, ports, stopChan, readyChan, out, errOut)
if err != nil {
return 0, nil, fmt.Errorf("failed to create forwarder: %w", err)
}

go func() {
if err := forwarder.ForwardPorts(); err != nil {
// TODO(marun) Need better error handling here? Or is ok for test-only usage?
panic(err)
}
}()

<-readyChan // Wait for port forwarding to be ready

// Retrieve the dynamically allocated local port
forwardedPorts, err := forwarder.GetPorts()
if err != nil {
close(stopChan)
return 0, nil, fmt.Errorf("failed to get forwarded ports: %w", err)
}
if len(forwardedPorts) == 0 {
close(stopChan)
return 0, nil, fmt.Errorf("failed to find at least one forwarded port: %w", err)
}
return forwardedPorts[0].Local, stopChan, nil
require.NoError(tc, err)
return nodeID
}

// createBootstrapTester creates a pod that can continuously bootstrap from the specified bootstrap IP+ID.
func createBootstrapTester(tc tests.TestContext, clientset *kubernetes.Clientset, namespace string, bootstrapIP string, bootstrapNodeID ids.NodeID) *appsv1.StatefulSet {
flags := defaultNodeFlags()
flags := defaultPodFlags()
flags[config.BootstrapIPsKey] = fmt.Sprintf("%s:%d", bootstrapIP, config.DefaultStakingPort)
flags[config.BootstrapIDsKey] = bootstrapNodeID.String()

Expand Down
2 changes: 1 addition & 1 deletion tests/fixture/bootstrapmonitor/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func WaitForCompletion(

// Avoid checking node health before it reports initial ready
log.Info("Waiting for pod readiness")
if err := WaitForPodCondition(ctx, clientset, namespace, podName, corev1.PodReady); err != nil {
if err := tmpnet.WaitForPodCondition(ctx, clientset, namespace, podName, corev1.PodReady); err != nil {
return fmt.Errorf("failed to wait for pod condition: %w", err)
}

Expand Down
Loading
Loading