Skip to content

Commit

Permalink
remove watches and use clientset to get a secret that is not cached
Browse files Browse the repository at this point in the history
Signed-off-by: jooho lee <[email protected]>
  • Loading branch information
Jooho committed Jun 22, 2024
1 parent 1de88b0 commit 13b5def
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 68 deletions.
66 changes: 2 additions & 64 deletions controllers/inferenceservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"

// "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -109,33 +108,6 @@ func (r *OpenshiftInferenceServiceReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, err
}

var certSecretPredicate = predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return hasInferenceServiceOwner(e.ObjectNew)
},
CreateFunc: func(e event.CreateEvent) bool {
return hasInferenceServiceOwner(e.Object)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return hasInferenceServiceOwner(e.Object)
},
GenericFunc: func(e event.GenericEvent) bool {
return hasInferenceServiceOwner(e.Object)
},
}

// check if the src secret has ownerReferences for InferenceService
func hasInferenceServiceOwner(obj client.Object) bool {

ownerReferences := obj.GetOwnerReferences()
for _, ownerReference := range ownerReferences {
if ownerReference.Kind == "Service" {
return true
}
}
return false
}

// SetupWithManager sets up the controller with the Manager.
func (r *OpenshiftInferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -179,42 +151,8 @@ func (r *OpenshiftInferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager)
})
}
return reconcileRequests
})).
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
r.log.Info("Reconcile event triggered by Secret: " + o.GetName())
isvc := &kservev1beta1.InferenceService{}
err := r.client.Get(ctx, types.NamespacedName{Name: o.GetName(), Namespace: o.GetNamespace()}, isvc)
if err != nil {
if apierrs.IsNotFound(err) {
return []reconcile.Request{}
}
r.log.Error(err, "Error getting the inferenceService", "name", o.GetName())
return []reconcile.Request{}
}

return []reconcile.Request{
{NamespacedName: types.NamespacedName{Name: o.GetName(), Namespace: o.GetNamespace()}},
}
}))

// Watches(&corev1.Secret{},
// handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
// r.log.Info("Reconcile event triggered by Secret: " + o.GetName())
// isvc := &kservev1beta1.InferenceService{}
// err := r.client.Get(ctx, types.NamespacedName{Name: o.GetName(), Namespace: o.GetNamespace()}, isvc)
// if err != nil {
// if apierrs.IsNotFound(err) {
// return []reconcile.Request{}
// }
// r.log.Error(err, "Error getting the inferenceService", "name", o.GetName())
// return []reconcile.Request{}
// }

// return []reconcile.Request{
// {NamespacedName: types.NamespacedName{Name: o.GetName(), Namespace: o.GetNamespace()}},
// }
// }), builder.WithPredicates(certSecretPredicate))

kserveWithMeshEnabled, kserveWithMeshEnabledErr := utils.VerifyIfComponentIsEnabled(context.Background(), mgr.GetClient(), utils.KServeWithServiceMeshComponent)
if kserveWithMeshEnabledErr != nil {
r.log.V(1).Error(kserveWithMeshEnabledErr, "could not determine if kserve have service mesh enabled")
Expand Down
43 changes: 39 additions & 4 deletions controllers/reconcilers/kserve_isvc_gateway_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reconcilers
import (
"context"
"fmt"
"net/url"
"reflect"

"github.com/go-logr/logr"
Expand All @@ -31,25 +32,42 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"

istiov1beta1 "istio.io/api/networking/v1beta1"
istioclientv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
"k8s.io/client-go/rest"
)

var _ SubResourceReconciler = (*KserveGatewayReconciler)(nil)
var meshNamespace string

type KserveGatewayReconciler struct {
client client.Client
clientset *kubernetes.Clientset
secretHandler resources.SecretHandler
gatewayHandler resources.GatewayHandler
deltaProcessor processors.DeltaProcessor
}

func NewKserveGatewayReconciler(client client.Client) *KserveGatewayReconciler {
config, err := rest.InClusterConfig()
if err != nil {
config, err = rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}

return &KserveGatewayReconciler{
client: client,
clientset: clientset,
secretHandler: resources.NewSecretHandler(client),
gatewayHandler: resources.NewGatewayHandler(client),
deltaProcessor: processors.NewDeltaProcessor(),
Expand All @@ -68,7 +86,8 @@ func (r *KserveGatewayReconciler) Reconcile(ctx context.Context, log logr.Logger
}

// return if serving cert secret in the source namespace is not created
srcCertSecret, err := r.secretHandler.Get(ctx, types.NamespacedName{Name: isvc.Name, Namespace: isvc.Namespace})
// srcCertSecret, err := r.secretHandler.Get(ctx, types.NamespacedName{Name: isvc.Name, Namespace: isvc.Namespace})
srcCertSecret, err := r.clientset.CoreV1().Secrets(isvc.Namespace).Get(ctx, isvc.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
log.V(1).Info(fmt.Sprintf("Waiting for the creation of the serving certificate Secret(%s) in %s namespace", isvc.Name, isvc.Namespace))
Expand All @@ -78,11 +97,12 @@ func (r *KserveGatewayReconciler) Reconcile(ctx context.Context, log logr.Logger
}

// Copy src secret to destination namespace when there is not the synced secret.
copiedCertSecret, err := r.secretHandler.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%s", isvc.Name, isvc.Namespace), Namespace: meshNamespace})
// copiedCertSecret, err := r.secretHandler.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%s", isvc.Name, isvc.Namespace), Namespace: meshNamespace})
copiedCertSecret, err := r.clientset.CoreV1().Secrets(meshNamespace).Get(ctx, fmt.Sprintf("%s-%s", isvc.Name, isvc.Namespace), metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
if err := r.copyServingCertSecretFromIsvcNamespace(ctx, srcCertSecret, nil); err != nil {
log.V(1).Error(err, fmt.Sprintf("Failed to copy the serving certificate Secret(%s) for InferenceService in %s namespace", srcCertSecret.Name, meshNamespace))
log.V(1).Error(err, fmt.Sprintf("Failed to copy the serving certificate Secret(%s) to %s namespace", srcCertSecret.Name, meshNamespace))
return err
}
}
Expand Down Expand Up @@ -121,7 +141,10 @@ func (r *KserveGatewayReconciler) Reconcile(ctx context.Context, log logr.Logger
}

func (r *KserveGatewayReconciler) getDesiredResource(isvc *kservev1beta1.InferenceService) (*istioclientv1beta1.Gateway, error) {
hostname := isvc.Status.Address.URL.String()
hostname,err := getURLWithoutScheme(isvc)
if err != nil{
return nil, err
}

desiredGateway := &istioclientv1beta1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -226,6 +249,9 @@ func (r *KserveGatewayReconciler) copyServingCertSecretFromIsvcNamespace(ctx con
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", sourceSecret.Name, sourceSecret.Namespace),
Namespace: meshNamespace,
Labels: map[string]string{
"opendatahub.io/managed": "true",
},
},
Data: sourceSecret.Data,
Type: sourceSecret.Type,
Expand Down Expand Up @@ -255,3 +281,12 @@ func (r *KserveGatewayReconciler) deleteServingCertSecretInIstioNamespace(ctx co
}
return nil
}

func getURLWithoutScheme(isvc *kservev1beta1.InferenceService) (string, error) {
parsedURL, err := url.Parse(isvc.Status.Address.URL.String())
if err != nil {
return "", err
}

return parsedURL.Host + parsedURL.Path, nil
}

0 comments on commit 13b5def

Please sign in to comment.