Skip to content

Commit

Permalink
fix: add ConfigurationError reason to Available condition by checking…
Browse files Browse the repository at this point in the history
… mlmd log, fixes RHOAIENG-11328
  • Loading branch information
dhirajsb committed Oct 24, 2024
1 parent e0ead0b commit 5798007
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
security "istio.io/client-go/pkg/apis/security/v1beta1"
authentication "k8s.io/api/authentication/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"os"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"

Expand Down Expand Up @@ -178,6 +179,12 @@ func main() {
}
setupLog.Info("cluster config", "isOpenShift", isOpenShift, "hasAuthorino", hasAuthorino, "hasIstio", hasIstio)

clientset, err := kubernetes.NewForConfig(mgrRestConfig)
if err != nil {
setupLog.Error(err, "error getting kubernetes clientset")
os.Exit(1)
}

enableWebhooks := os.Getenv(config.EnableWebhooks) != "false"
createAuthResources := os.Getenv(config.CreateAuthResources) != "false"
defaultDomain := os.Getenv(config.DefaultDomain)
Expand All @@ -198,6 +205,7 @@ func main() {

if err = (&controller.ModelRegistryReconciler{
Client: client,
ClientSet: clientset,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("modelregistry-controller"),
Log: ctrl.Log.WithName("controller"),
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/modelregistry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
errors2 "errors"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes"
"strings"
"text/template"

Expand Down Expand Up @@ -62,6 +63,7 @@ const (
// ModelRegistryReconciler reconciles a ModelRegistry object
type ModelRegistryReconciler struct {
client.Client
ClientSet *kubernetes.Clientset
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Expand Down
50 changes: 50 additions & 0 deletions internal/controller/modelregistry_controller_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ limitations under the License.
package controller

import (
"bytes"
"context"
"fmt"
"github.com/go-logr/logr"
authorino "github.com/kuadrant/authorino/api/v1beta2"
modelregistryv1alpha1 "github.com/opendatahub-io/model-registry-operator/api/v1alpha1"
routev1 "github.com/openshift/api/route/v1"
"io"
"istio.io/client-go/pkg/apis/networking/v1beta1"
v1beta12 "istio.io/client-go/pkg/apis/security/v1beta1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"regexp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
klog "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -55,12 +58,21 @@ const (
ReasonDeploymentUpdating = "UpdatingDeployment"
ReasonDeploymentAvailable = "DeploymentAvailable"
ReasonDeploymentUnavailable = "DeploymentUnavailable"
ReasonConfigurationError = "ConfigurationError"

ReasonResourcesCreated = "CreatedResources"
ReasonResourcesAvailable = "ResourcesAvailable"
ReasonResourcesUnavailable = "ResourcesUnavailable"

grpcContainerName = "grpc-container"
)

// errRegexp is based on the CHECK_EQ macro output used by mlmd container.
// For more details on Abseil logging and CHECK_EQ macro see [Abseil documentation].
//
// [Abseil documentation]: https://abseil.io/docs/cpp/guides/logging#CHECK
var errRegexp = regexp.MustCompile("Check failed: absl::OkStatus\\(\\) == status \\(OK vs. ([^)]+)\\) (.*)")

func (r *ModelRegistryReconciler) setRegistryStatus(ctx context.Context, req ctrl.Request, operationResult OperationResult) (bool, error) {
log := klog.FromContext(ctx)

Expand Down Expand Up @@ -197,6 +209,21 @@ func (r *ModelRegistryReconciler) CheckPodStatus(ctx context.Context, req ctrl.R
failedContainers := make(map[string]string)
for _, s := range p.Status.ContainerStatuses {
if !s.Ready {
// look for MLMD container errors
if s.Name == grpcContainerName {
// check container log for MLMD errors
dbError, err := r.getGrpcContainerDBerror(ctx, p)
if err != nil {
// log K8s error
r.Log.Error(err, "failed to get grpc container error")
}
if dbError != nil {
// MLMD errors take priority
reason = ReasonConfigurationError
message = fmt.Sprintf("Metadata database configuration error: %s", dbError)
return available, reason, message
}
}
if s.State.Waiting != nil {
failedContainers[s.Name] = fmt.Sprintf("{waiting: {reason: %s, message: %s}}", s.State.Waiting.Reason, s.State.Waiting.Message)
} else if s.State.Terminated != nil {
Expand Down Expand Up @@ -242,6 +269,29 @@ func (r *ModelRegistryReconciler) CheckPodStatus(ctx context.Context, req ctrl.R
return available, reason, message
}

// getGrpcContainerDBerror scrapes container log and returns a database connection error if it exists in the logs
// it also returns a k8s API error if it cannot read the container log
func (r *ModelRegistryReconciler) getGrpcContainerDBerror(ctx context.Context, pod corev1.Pod) (error, error) {
request := r.ClientSet.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Container: grpcContainerName})
podLogs, err := request.Stream(ctx)
if err != nil {
return nil, err
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return nil, err
}
logText := buf.String()
submatch := errRegexp.FindStringSubmatch(logText)
if len(submatch) > 0 {
return fmt.Errorf("%s: %s", submatch[2], submatch[1]), nil
}
return nil, nil
}

func (r *ModelRegistryReconciler) SetIstioAndGatewayConditions(ctx context.Context, req ctrl.Request,
modelRegistry *modelregistryv1alpha1.ModelRegistry,
status metav1.ConditionStatus, reason string, message string) (metav1.ConditionStatus, string, string) {
Expand Down

0 comments on commit 5798007

Please sign in to comment.