Skip to content

Commit

Permalink
Merge pull request #642 from jakobmoellerdev/better-liveness
Browse files Browse the repository at this point in the history
OCPEDGE-1165: fix: Ensure no racy CSI plugin registration, better startup behavior for vgmanager, failing test summary artifact
  • Loading branch information
openshift-merge-bot[bot] authored Jun 26, 2024
2 parents 319dc63 + 65137a4 commit 748fb1b
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 50 deletions.
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,21 @@ SUBSCRIPTION_CHANNEL ?= alpha
# Handles only AWS as of now.
DISK_INSTALL ?= false

ifdef ARTIFACT_DIR
SUMMARY_FILE = $(ARTIFACT_DIR)/lvms-e2e-summary.yaml
endif

.PHONY: e2e
e2e: ginkgo ## Build and run e2e tests.
cd test/e2e && $(GINKGO) build
cd test/e2e && ./e2e.test --lvm-catalog-image=$(CATALOG_IMG) --lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) --lvm-operator-install=$(LVM_OPERATOR_INSTALL) --lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) --disk-install=$(DISK_INSTALL) -ginkgo.v
cd test/e2e && ./e2e.test \
--lvm-catalog-image=$(CATALOG_IMG) \
--lvm-subscription-channel=$(SUBSCRIPTION_CHANNEL) \
--lvm-operator-install=$(LVM_OPERATOR_INSTALL) \
--lvm-operator-uninstall=$(LVM_OPERATOR_UNINSTALL) \
--disk-install=$(DISK_INSTALL) \
--summary-file=$(SUMMARY_FILE) \
-ginkgo.v

performance-stress-test: ## Build and run stress tests. Requires a fully setup LVMS installation. if you receive an error during running because of a missing token it might be because you have not logged in via token authentication but OIDC. you need a token login to run the performance test.
oc apply -f ./config/samples/lvm_v1alpha1_lvmcluster.yaml -n openshift-storage
Expand Down
7 changes: 6 additions & 1 deletion api/v1alpha1/lvmcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ var _ = Describe("webhook acceptance tests", func() {
Expect(statusError.Status().Message).To(ContainSubstring(ErrDuplicateLVMCluster.Error()))

Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
})
},
// It can happen that the creation of the LVMCluster is not yet visible to the webhook, so we retry a few times.
// This is a workaround for the fact that the informer cache is not yet updated when the webhook is called.
// This is faster / more efficient than waiting for the informer cache to be updated with a set interval.
FlakeAttempts(3),
)

It("namespace cannot be looked up via ENV", func(ctx SpecContext) {
generatedName := generateUniqueNameForTestCase(ctx)
Expand Down
65 changes: 48 additions & 17 deletions cmd/vgmanager/vgmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ import (
topoLVMD "github.com/topolvm/topolvm/pkg/lvmd"
"github.com/topolvm/topolvm/pkg/runners"
"google.golang.org/grpc"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/apimachinery/pkg/runtime"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -74,6 +73,8 @@ const (
)

var ErrConfigModified = errors.New("lvmd config file is modified")
var ErrNoDeviceClassesAvailable = errors.New("no device classes in lvmd.yaml configured, can not startup correctly")
var ErrCSIPluginNotYetRegistered = errors.New("CSI plugin not yet registered")

type Options struct {
Scheme *runtime.Scheme
Expand Down Expand Up @@ -151,6 +152,12 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
return fmt.Errorf("unable to start manager: %w", err)
}

registrationServer := internalCSI.NewRegistrationServer(
cancelWithCause,
constants.TopolvmCSIDriverName,
registrationPath(),
[]string{"1.0.0"},
)
lvmdConfig := &lvmd.Config{}
if err := loadConfFile(ctx, lvmdConfig, lvmd.DefaultFileConfigPath); err != nil {
opts.SetupLog.Error(err, "lvmd config could not be loaded, starting without topolvm components and attempting bootstrap")
Expand All @@ -169,31 +176,27 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
if err := os.MkdirAll(topolvm.DeviceDirectory, 0755); err != nil {
return err
}
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ErrorLoggingInterceptor),
grpc.SharedWriteBuffer(true),
grpc.MaxConcurrentStreams(1),
grpc.NumStreamWorkers(1))
csiGrpcServer := newGRPCServer()
identityServer := driver.NewIdentityServer(func() (bool, error) {
return true, nil
})
csi.RegisterIdentityServer(grpcServer, identityServer)

registrationServer := internalCSI.NewRegistrationServer(
constants.TopolvmCSIDriverName, registrationPath(), []string{"1.0.0"})
registerapi.RegisterRegistrationServer(grpcServer, registrationServer)
if err = mgr.Add(runners.NewGRPCRunner(grpcServer, pluginRegistrationSocketPath(), false)); err != nil {
return fmt.Errorf("could not add grpc runner for registration server: %w", err)
}
csi.RegisterIdentityServer(csiGrpcServer, identityServer)

nodeServer, err := driver.NewNodeServer(nodeName, vgclnt, lvclnt, mgr) // adjusted signature
if err != nil {
return fmt.Errorf("could not setup topolvm node server: %w", err)
}
csi.RegisterNodeServer(grpcServer, nodeServer)
err = mgr.Add(runners.NewGRPCRunner(grpcServer, constants.DefaultCSISocket, false))
csi.RegisterNodeServer(csiGrpcServer, nodeServer)
err = mgr.Add(internalCSI.NewGRPCRunner(csiGrpcServer, constants.DefaultCSISocket, false))
if err != nil {
return fmt.Errorf("could not add grpc runner for node server: %w", err)
}

registrationGrpcServer := newGRPCServer()
registerapi.RegisterRegistrationServer(registrationGrpcServer, registrationServer)
if err = mgr.Add(internalCSI.NewGRPCRunner(registrationGrpcServer, pluginRegistrationSocketPath(), false)); err != nil {
return fmt.Errorf("could not add grpc runner for registration server: %w", err)
}
}

if err = (&vgmanager.Reconciler{
Expand All @@ -216,7 +219,17 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
return fmt.Errorf("unable to set up ready check: %w", err)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
if err := mgr.AddHealthzCheck("healthz", func(req *http.Request) error {
if len(lvmdConfig.DeviceClasses) == 0 {
log.FromContext(req.Context()).Error(ErrNoDeviceClassesAvailable, "not healthy")
return ErrNoDeviceClassesAvailable
}
if !registrationServer.Registered() {
log.FromContext(req.Context()).Error(ErrCSIPluginNotYetRegistered, "not healthy")
return ErrCSIPluginNotYetRegistered
}
return nil
}); err != nil {
return fmt.Errorf("unable to set up health check: %w", err)
}

Expand All @@ -237,6 +250,9 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
if errors.Is(context.Cause(ctx), ErrConfigModified) {
opts.SetupLog.Info("restarting controller due to modified configuration")
return run(cmd, nil, opts)
} else if errors.Is(context.Cause(ctx), internalCSI.ErrPluginRegistrationFailed) {
opts.SetupLog.Error(context.Cause(ctx), "restarting due to failed plugin registration")
return run(cmd, nil, opts)
} else if err := ctx.Err(); err != nil {
opts.SetupLog.Error(err, "exiting abnormally")
os.Exit(1)
Expand Down Expand Up @@ -335,3 +351,18 @@ func readyCheck(mgr manager.Manager) healthz.Checker {
return nil
}
}

// newGRPCServer returns a new grpc.Server with the following settings:
// UnaryInterceptor: ErrorLoggingInterceptor, to log errors on grpc calls
// SharedWriteBuffer: true, to share write buffer between all connections, saving memory
// 2 streams for one core each (vgmanager is optimized for 1 hyperthreaded core)
// 2 workers for one core each (vgmanager is optimized for 1 hyperthreaded core)
// We technically could use 1 worker / 1 stream, but that would make the goroutine
// switch threads more often, which is less efficient.
func newGRPCServer() *grpc.Server {
return grpc.NewServer(grpc.UnaryInterceptor(ErrorLoggingInterceptor),
grpc.SharedWriteBuffer(true),
grpc.MaxConcurrentStreams(2),
grpc.NumStreamWorkers(2),
)
}
16 changes: 13 additions & 3 deletions internal/controllers/lvmcluster/resource/topolvm_csi_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/openshift/lvm-operator/internal/controllers/constants"
"github.com/openshift/lvm-operator/internal/controllers/labels"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,12 +82,21 @@ func (c csiDriver) EnsureDeleted(r Reconciler, ctx context.Context, _ *lvmv1alph
return fmt.Errorf("the CSIDriver %s is still present, waiting for deletion", constants.TopolvmCSIDriverName)
}

if err := r.Delete(ctx, csiDriverResource); err != nil {
if err := r.Delete(ctx, csiDriverResource); errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("failed to delete topolvm csi driver %s: %w", csiDriverResource.GetName(), err)
}
logger.Info("initiated topolvm csi driver deletion", "TopolvmCSIDriverName", csiDriverResource.Name)

return nil
logger.Info("initiated CSIDriver deletion", "TopolvmCSIDriverName", csiDriverResource.Name)

if err := r.Get(ctx, name, csiDriverResource); errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("failed to verify deletion of topolvm csi driver %s: %w", csiDriverResource.GetName(), err)
} else {
return fmt.Errorf("topolvm csi driver %s still has to be removed", csiDriverResource.Name)
}
}

func getCSIDriverResource() *storagev1.CSIDriver {
Expand Down
48 changes: 45 additions & 3 deletions internal/controllers/lvmcluster/resource/vgmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1"
"github.com/openshift/lvm-operator/internal/cluster"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
cutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)
Expand Down Expand Up @@ -53,7 +56,7 @@ func (v vgManager) EnsureCreated(r Reconciler, ctx context.Context, lvmCluster *
logger := log.FromContext(ctx).WithValues("resourceManager", v.GetName())

// get desired daemonset spec
dsTemplate := newVGManagerDaemonset(
dsTemplate := templateVGManagerDaemonset(
lvmCluster,
v.clusterType,
r.GetNamespace(),
Expand Down Expand Up @@ -131,8 +134,47 @@ func (v vgManager) EnsureCreated(r Reconciler, ctx context.Context, lvmCluster *
return nil
}

// ensureDeleted is a noop. Deletion will be handled by ownerref
func (v vgManager) EnsureDeleted(_ Reconciler, _ context.Context, _ *lvmv1alpha1.LVMCluster) error {
// EnsureDeleted makes sure that the driver is removed from the cluster and the daemonset is gone.
// Deletion will be triggered again even though we also have an owner reference
func (v vgManager) EnsureDeleted(r Reconciler, ctx context.Context, lvmCluster *lvmv1alpha1.LVMCluster) error {
logger := log.FromContext(ctx).WithValues("resourceManager", v.GetName())

// delete the daemonset
ds := templateVGManagerDaemonset(
lvmCluster,
v.clusterType,
r.GetNamespace(),
r.GetImageName(),
r.GetVGManagerCommand(),
r.GetLogPassthroughOptions().VGManager.AsArgs(),
)

if err := r.Delete(ctx, &ds); errors.IsNotFound(err) {
return nil
} else if err != nil {
return fmt.Errorf("failed to delete %s daemonset %q: %w", v.GetName(), ds.Name, err)
}

logger.Info("initiated DaemonSet deletion", "DaemonSet", ds.Name)

if err := r.Get(ctx, client.ObjectKeyFromObject(&ds), &ds); err == nil {
return fmt.Errorf("%s daemonset %q still has to be removed", v.GetName(), ds.Name)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to verify deletion of %s daemonset %q: %w", v.GetName(), ds.Name, err)
}

// because we have background deletion, we also have to check the pods
// if there are still pods, we have to wait for them to be removed
// if there are no pods, we can consider the daemonset deleted
podList := &v1.PodList{}
if err := r.List(ctx, podList, client.InNamespace(r.GetNamespace()),
client.MatchingLabels(ds.Spec.Selector.MatchLabels),
); err != nil {
return fmt.Errorf("failed to list pods for DaemonSet %q: %w", ds.Name, err)
} else if len(podList.Items) > 0 {
return fmt.Errorf("DaemonSet %q still has %d pods running", ds.Name, len(podList.Items))
}

return nil
}

Expand Down
12 changes: 10 additions & 2 deletions internal/controllers/lvmcluster/resource/vgmanager_daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ var (
}
)

// newVGManagerDaemonset returns the desired vgmanager daemonset for a given LVMCluster
func newVGManagerDaemonset(
// templateVGManagerDaemonset returns the desired vgmanager daemonset for a given LVMCluster
func templateVGManagerDaemonset(
lvmCluster *lvmv1alpha1.LVMCluster,
clusterType cluster.Type,
namespace, vgImage string,
Expand Down Expand Up @@ -268,6 +268,14 @@ func newVGManagerDaemonset(
ContainerPort: 8081,
Protocol: corev1.ProtocolTCP},
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/healthz",
Port: intstr.FromString(constants.TopolvmNodeContainerHealthzName)}},
FailureThreshold: 10,
InitialDelaySeconds: 0,
TimeoutSeconds: 2,
PeriodSeconds: 2},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/healthz",
Expand Down
8 changes: 6 additions & 2 deletions internal/csi/grpc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package csi

import (
"context"
"fmt"
"net"
"os"
"time"
Expand Down Expand Up @@ -32,11 +33,11 @@ func (r gRPCServerRunner) Start(ctx context.Context) error {
logger.Info("Starting gRPC server", "sockFile", r.sockFile)
err := os.Remove(r.sockFile)
if err != nil && !os.IsNotExist(err) {
return err
return fmt.Errorf("failed to remove existing socket file before startup: %w", err)
}
lis, err := net.Listen("unix", r.sockFile)
if err != nil {
return err
return fmt.Errorf("failed to listen on %s: %w", r.sockFile, err)
}

go func() {
Expand All @@ -60,6 +61,9 @@ func (r gRPCServerRunner) Start(ctx context.Context) error {
r.srv.Stop()
logger.Info("Stopped gRPC server forcibly", "duration", time.Since(start))
}
if err := os.Remove(r.sockFile); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove socket file after shutdown: %w", err)
}
return nil
}

Expand Down
Loading

0 comments on commit 748fb1b

Please sign in to comment.