Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPEDGE-1165: fix: Ensure no racy CSI plugin registration, better startup behavior for vgmanager, failing test summary artifact #642

Merged
merged 11 commits into from
Jun 26, 2024
Merged
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,21 @@ SUBSCRIPTION_CHANNEL ?= alpha
# Handles only AWS as of now.
DISK_INSTALL ?= false

ifdef ARTIFACT_DIR
SUMMARY_FILE = $(ARTIFACT_DIR)/lvms-e2e-summary.yaml
endif

.PHONY: e2e
e2e: ginkgo ## Build and run e2e tests.
cd test/e2e && $(GINKGO) build
cd test/e2e && ./e2e.test --lvm-catalog-image=$(CATALOG_IMG) --lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) --lvm-operator-install=$(LVM_OPERATOR_INSTALL) --lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) --disk-install=$(DISK_INSTALL) -ginkgo.v
cd test/e2e && ./e2e.test \
--lvm-catalog-image=$(CATALOG_IMG) \
--lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) \
--lvm-operator-install=$(LVM_OPERATOR_INSTALL) \
--lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) \
--disk-install=$(DISK_INSTALL) \
--summary-file=$(SUMMARY_FILE) \
-ginkgo.v

performance-stress-test: ## Build and run stress tests. Requires a fully setup LVMS installation. if you receive an error during running because of a missing token it might be because you have not logged in via token authentication but OIDC. you need a token login to run the performance test.
oc apply -f ./config/samples/lvm_v1alpha1_lvmcluster.yaml -n openshift-storage
Expand Down
7 changes: 6 additions & 1 deletion api/v1alpha1/lvmcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ var _ = Describe("webhook acceptance tests", func() {
Expect(statusError.Status().Message).To(ContainSubstring(ErrDuplicateLVMCluster.Error()))

Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
})
},
// It can happen that the creation of the LVMCluster is not yet visible to the webhook, so we retry a few times.
// This is a workaround for the fact that the informer cache is not yet updated when the webhook is called.
// This is faster / more efficient than waiting for the informer cache to be updated with a set interval.
FlakeAttempts(3),
)

It("namespace cannot be looked up via ENV", func(ctx SpecContext) {
generatedName := generateUniqueNameForTestCase(ctx)
Expand Down
58 changes: 41 additions & 17 deletions cmd/vgmanager/vgmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ import (
topoLVMD "github.com/topolvm/topolvm/pkg/lvmd"
"github.com/topolvm/topolvm/pkg/runners"
"google.golang.org/grpc"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/apimachinery/pkg/runtime"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -74,6 +73,8 @@ const (
)

var ErrConfigModified = errors.New("lvmd config file is modified")
var ErrNoDeviceClassesAvailable = errors.New("no device classes in lvmd.yaml configured, can not startup correctly")
var ErrCSIPluginNotYetRegistered = errors.New("CSI plugin not yet registered")

type Options struct {
Scheme *runtime.Scheme
Expand Down Expand Up @@ -151,6 +152,12 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
return fmt.Errorf("unable to start manager: %w", err)
}

registrationServer := internalCSI.NewRegistrationServer(
cancelWithCause,
constants.TopolvmCSIDriverName,
registrationPath(),
[]string{"1.0.0"},
)
lvmdConfig := &lvmd.Config{}
if err := loadConfFile(ctx, lvmdConfig, lvmd.DefaultFileConfigPath); err != nil {
opts.SetupLog.Error(err, "lvmd config could not be loaded, starting without topolvm components and attempting bootstrap")
Expand All @@ -169,31 +176,27 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
if err := os.MkdirAll(topolvm.DeviceDirectory, 0755); err != nil {
return err
}
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ErrorLoggingInterceptor),
grpc.SharedWriteBuffer(true),
grpc.MaxConcurrentStreams(1),
grpc.NumStreamWorkers(1))
csiGrpcServer := newGRPCServer()
identityServer := driver.NewIdentityServer(func() (bool, error) {
return true, nil
})
csi.RegisterIdentityServer(grpcServer, identityServer)

registrationServer := internalCSI.NewRegistrationServer(
constants.TopolvmCSIDriverName, registrationPath(), []string{"1.0.0"})
registerapi.RegisterRegistrationServer(grpcServer, registrationServer)
if err = mgr.Add(runners.NewGRPCRunner(grpcServer, pluginRegistrationSocketPath(), false)); err != nil {
return fmt.Errorf("could not add grpc runner for registration server: %w", err)
}
csi.RegisterIdentityServer(csiGrpcServer, identityServer)

nodeServer, err := driver.NewNodeServer(nodeName, vgclnt, lvclnt, mgr) // adjusted signature
if err != nil {
return fmt.Errorf("could not setup topolvm node server: %w", err)
}
csi.RegisterNodeServer(grpcServer, nodeServer)
err = mgr.Add(runners.NewGRPCRunner(grpcServer, constants.DefaultCSISocket, false))
csi.RegisterNodeServer(csiGrpcServer, nodeServer)
err = mgr.Add(internalCSI.NewGRPCRunner(csiGrpcServer, constants.DefaultCSISocket, false))
if err != nil {
return fmt.Errorf("could not add grpc runner for node server: %w", err)
}

registrationGrpcServer := newGRPCServer()
registerapi.RegisterRegistrationServer(registrationGrpcServer, registrationServer)
if err = mgr.Add(internalCSI.NewGRPCRunner(registrationGrpcServer, pluginRegistrationSocketPath(), false)); err != nil {
return fmt.Errorf("could not add grpc runner for registration server: %w", err)
}
}

if err = (&vgmanager.Reconciler{
Expand All @@ -216,7 +219,18 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
return fmt.Errorf("unable to set up ready check: %w", err)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
if err := mgr.AddHealthzCheck("healthz", func(req *http.Request) error {
if len(lvmdConfig.DeviceClasses) == 0 {
log.FromContext(req.Context()).Error(ErrNoDeviceClassesAvailable, "not healthy",
"available_device_classes", lvmdConfig.DeviceClasses)
jakobmoellerdev marked this conversation as resolved.
Show resolved Hide resolved
return ErrNoDeviceClassesAvailable
}
if !registrationServer.Registered() {
log.FromContext(req.Context()).Error(ErrCSIPluginNotYetRegistered, "not healthy")
return ErrCSIPluginNotYetRegistered
}
return nil
}); err != nil {
return fmt.Errorf("unable to set up health check: %w", err)
}

Expand All @@ -237,6 +251,9 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
if errors.Is(context.Cause(ctx), ErrConfigModified) {
opts.SetupLog.Info("restarting controller due to modified configuration")
return run(cmd, nil, opts)
} else if errors.Is(context.Cause(ctx), internalCSI.ErrPluginRegistrationFailed) {
opts.SetupLog.Error(context.Cause(ctx), "restarting due to failed plugin registration")
return run(cmd, nil, opts)
} else if err := ctx.Err(); err != nil {
opts.SetupLog.Error(err, "exiting abnormally")
os.Exit(1)
Expand Down Expand Up @@ -335,3 +352,10 @@ func readyCheck(mgr manager.Manager) healthz.Checker {
return nil
}
}

func newGRPCServer() *grpc.Server {
return grpc.NewServer(grpc.UnaryInterceptor(ErrorLoggingInterceptor),
grpc.SharedWriteBuffer(true),
grpc.MaxConcurrentStreams(2),
jakobmoellerdev marked this conversation as resolved.
Show resolved Hide resolved
grpc.NumStreamWorkers(2))
}
16 changes: 13 additions & 3 deletions internal/controllers/lvmcluster/resource/topolvm_csi_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/openshift/lvm-operator/internal/controllers/constants"
"github.com/openshift/lvm-operator/internal/controllers/labels"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,12 +82,21 @@ func (c csiDriver) EnsureDeleted(r Reconciler, ctx context.Context, _ *lvmv1alph
return fmt.Errorf("the CSIDriver %s is still present, waiting for deletion", constants.TopolvmCSIDriverName)
}

if err := r.Delete(ctx, csiDriverResource); err != nil {
if err := r.Delete(ctx, csiDriverResource); errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("failed to delete topolvm csi driver %s: %w", csiDriverResource.GetName(), err)
}
logger.Info("initiated topolvm csi driver deletion", "TopolvmCSIDriverName", csiDriverResource.Name)

return nil
logger.Info("initiated CSIDriver deletion", "TopolvmCSIDriverName", csiDriverResource.Name)

if err := r.Get(ctx, name, csiDriverResource); errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("failed to verify deletion of topolvm csi driver %s: %w", csiDriverResource.GetName(), err)
} else {
return fmt.Errorf("topolvm csi driver %s still has to be removed", csiDriverResource.Name)
}
}

func getCSIDriverResource() *storagev1.CSIDriver {
Expand Down
48 changes: 45 additions & 3 deletions internal/controllers/lvmcluster/resource/vgmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1"
"github.com/openshift/lvm-operator/internal/cluster"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
cutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)
Expand Down Expand Up @@ -53,7 +56,7 @@ func (v vgManager) EnsureCreated(r Reconciler, ctx context.Context, lvmCluster *
logger := log.FromContext(ctx).WithValues("resourceManager", v.GetName())

// get desired daemonset spec
dsTemplate := newVGManagerDaemonset(
dsTemplate := templateVGManagerDaemonset(
lvmCluster,
v.clusterType,
r.GetNamespace(),
Expand Down Expand Up @@ -131,8 +134,47 @@ func (v vgManager) EnsureCreated(r Reconciler, ctx context.Context, lvmCluster *
return nil
}

// ensureDeleted is a noop. Deletion will be handled by ownerref
func (v vgManager) EnsureDeleted(_ Reconciler, _ context.Context, _ *lvmv1alpha1.LVMCluster) error {
// EnsureDeleted makes sure that the driver is removed from the cluster and the daemonset is gone.
// Deletion will be triggered again even though we also have an owner reference
func (v vgManager) EnsureDeleted(r Reconciler, ctx context.Context, lvmCluster *lvmv1alpha1.LVMCluster) error {
logger := log.FromContext(ctx).WithValues("resourceManager", v.GetName())

// delete the daemonset
ds := templateVGManagerDaemonset(
lvmCluster,
v.clusterType,
r.GetNamespace(),
r.GetImageName(),
r.GetVGManagerCommand(),
r.GetLogPassthroughOptions().VGManager.AsArgs(),
)

if err := r.Delete(ctx, &ds); errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("failed to delete %s daemonset %q: %w", v.GetName(), ds.Name, err)
}

logger.Info("initiated DaemonSet deletion", "DaemonSet", ds.Name)

if err := r.Get(ctx, client.ObjectKeyFromObject(&ds), &ds); err == nil {
return fmt.Errorf("%s daemonset %q still has to be removed", v.GetName(), ds.Name)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to verify deletion of %s daemonset %q: %w", v.GetName(), ds.Name, err)
}

// because we have background deletion, we also have to check the pods
// if there are still pods, we have to wait for them to be removed
// if there are no pods, we can consider the daemonset deleted
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(r.GetNamespace()),
client.MatchingLabels(ds.Spec.Selector.MatchLabels),
); err != nil {
return fmt.Errorf("failed to list pods for DaemonSet %q: %w", ds.Name, err)
} else if len(podList.Items) > 0 {
return fmt.Errorf("DaemonSet %q still has %d pods running", ds.Name, len(podList.Items))
}

return nil
}

Expand Down
12 changes: 10 additions & 2 deletions internal/controllers/lvmcluster/resource/vgmanager_daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ var (
}
)

// newVGManagerDaemonset returns the desired vgmanager daemonset for a given LVMCluster
func newVGManagerDaemonset(
// templateVGManagerDaemonset returns the desired vgmanager daemonset for a given LVMCluster
func templateVGManagerDaemonset(
lvmCluster *lvmv1alpha1.LVMCluster,
clusterType cluster.Type,
namespace, vgImage string,
Expand Down Expand Up @@ -268,6 +268,14 @@ func newVGManagerDaemonset(
ContainerPort: 8081,
Protocol: corev1.ProtocolTCP},
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/healthz",
Port: intstr.FromString(constants.TopolvmNodeContainerHealthzName)}},
FailureThreshold: 10,
InitialDelaySeconds: 0,
TimeoutSeconds: 2,
PeriodSeconds: 2},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/healthz",
Expand Down
8 changes: 6 additions & 2 deletions internal/csi/grpc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package csi

import (
"context"
"fmt"
"net"
"os"
"time"
Expand Down Expand Up @@ -32,11 +33,11 @@ func (r gRPCServerRunner) Start(ctx context.Context) error {
logger.Info("Starting gRPC server", "sockFile", r.sockFile)
err := os.Remove(r.sockFile)
if err != nil && !os.IsNotExist(err) {
return err
return fmt.Errorf("failed to remove existing socket file before startup: %w", err)
}
lis, err := net.Listen("unix", r.sockFile)
if err != nil {
return err
return fmt.Errorf("failed to listen on %s: %w", r.sockFile, err)
}

go func() {
Expand All @@ -60,6 +61,9 @@ func (r gRPCServerRunner) Start(ctx context.Context) error {
r.srv.Stop()
logger.Info("Stopped gRPC server forcibly", "duration", time.Since(start))
}
if err := os.Remove(r.sockFile); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove socket file after shutdown: %w", err)
}
return nil
}

Expand Down
Loading