From b97da6ca3a08e3f0fc35552dd7f0bd3b59083f35 Mon Sep 17 00:00:00 2001 From: Daniel Dowler <12484302+dandawg@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:43:35 -0700 Subject: [PATCH] feat: Add online/offline replica support (#4812) * added replica support to online/offline store services Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> * Removed unneaded if statement Co-authored-by: Tommy Hughes IV Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> * fixed missing bracket Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> * added doc comments describing replicas Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> * Replicas doc wording change Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> --------- Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> Co-authored-by: Tommy Hughes IV --- .../api/v1alpha1/featurestore_types.go | 20 ++- .../api/v1alpha1/zz_generated.deepcopy.go | 25 +++- .../crd/bases/feast.dev_featurestores.yaml | 24 ++++ infra/feast-operator/dist/install.yaml | 24 ++++ .../featurestore_controller_db_store_test.go | 3 +- .../featurestore_controller_ephemeral_test.go | 3 +- ...restore_controller_kubernetes_auth_test.go | 3 +- ...eaturestore_controller_objectstore_test.go | 3 +- .../featurestore_controller_oidc_auth_test.go | 3 +- .../featurestore_controller_pvc_test.go | 3 +- .../featurestore_controller_test.go | 132 ++++++++++++++++-- .../internal/controller/services/services.go | 17 ++- .../internal/controller/services/util.go | 11 +- 13 files changed, 244 insertions(+), 27 deletions(-) diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 454c909f7d..635912a1b5 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -74,9 +74,9 @@ type FeatureStoreServices struct { // OfflineStore configures the deployed offline store service type OfflineStore struct { - ServiceConfigs `json:",inline"` - Persistence *OfflineStorePersistence `json:"persistence,omitempty"` - TLS *OfflineTlsConfigs `json:"tls,omitempty"` + StoreServiceConfigs `json:",inline"` + Persistence *OfflineStorePersistence `json:"persistence,omitempty"` + TLS *OfflineTlsConfigs `json:"tls,omitempty"` // LogLevel sets the logging level for the offline store service // Allowed values: "debug", "info", "warning", "error", "critical". // +kubebuilder:validation:Enum=debug;info;warning;error;critical @@ -134,9 +134,9 @@ var ValidOfflineStoreDBStorePersistenceTypes = []string{ // OnlineStore configures the deployed online store service type OnlineStore struct { - ServiceConfigs `json:",inline"` - Persistence *OnlineStorePersistence `json:"persistence,omitempty"` - TLS *TlsConfigs `json:"tls,omitempty"` + StoreServiceConfigs `json:",inline"` + Persistence *OnlineStorePersistence `json:"persistence,omitempty"` + TLS *TlsConfigs `json:"tls,omitempty"` // LogLevel sets the logging level for the online store service // Allowed values: "debug", "info", "warning", "error", "critical". // +kubebuilder:validation:Enum=debug;info;warning;error;critical @@ -297,6 +297,14 @@ type DefaultConfigs struct { Image *string `json:"image,omitempty"` } +// StoreServiceConfigs k8s deployment settings +type StoreServiceConfigs struct { + // Replicas determines the number of pods for the feast service. + // When Replicas > 1, persistence is recommended. + Replicas *int32 `json:"replicas,omitempty"` + ServiceConfigs `json:",inline"` +} + // OptionalConfigs k8s container settings that are optional type OptionalConfigs struct { Env *[]corev1.EnvVar `json:"env,omitempty"` diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index 3f317c650e..bccf9ec537 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -273,7 +273,7 @@ func (in *LocalRegistryConfig) DeepCopy() *LocalRegistryConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OfflineStore) DeepCopyInto(out *OfflineStore) { *out = *in - in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + in.StoreServiceConfigs.DeepCopyInto(&out.StoreServiceConfigs) if in.Persistence != nil { in, out := &in.Persistence, &out.Persistence *out = new(OfflineStorePersistence) @@ -397,7 +397,7 @@ func (in *OidcAuthz) DeepCopy() *OidcAuthz { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OnlineStore) DeepCopyInto(out *OnlineStore) { *out = *in - in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + in.StoreServiceConfigs.DeepCopyInto(&out.StoreServiceConfigs) if in.Persistence != nil { in, out := &in.Persistence, &out.Persistence *out = new(OnlineStorePersistence) @@ -737,6 +737,27 @@ func (in *ServiceHostnames) DeepCopy() *ServiceHostnames { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StoreServiceConfigs) DeepCopyInto(out *StoreServiceConfigs) { + *out = *in + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } + in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StoreServiceConfigs. +func (in *StoreServiceConfigs) DeepCopy() *StoreServiceConfigs { + if in == nil { + return nil + } + out := new(StoreServiceConfigs) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TlsConfigs) DeepCopyInto(out *TlsConfigs) { *out = *in diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 61b3e7adcf..74f0fd059e 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -369,6 +369,12 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -745,6 +751,12 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1614,6 +1626,12 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1997,6 +2015,12 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index 5d56e78639..f40c5caebb 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -377,6 +377,12 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -753,6 +759,12 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1622,6 +1634,12 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -2005,6 +2023,12 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + description: |- + Replicas determines the number of pods for the feast service. + When Replicas > 1, persistence is recommended. + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go b/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go index 377ee6bc51..0ee269bda1 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go @@ -127,6 +127,7 @@ var _ = Describe("FeatureStore Controller - db storage services", func() { Context("When deploying a resource with all db storage services", func() { const resourceName = "cr-name" var pullPolicy = corev1.PullAlways + var replicas = int32(1) ctx := context.Background() @@ -205,7 +206,7 @@ var _ = Describe("FeatureStore Controller - db storage services", func() { By("creating the custom resource for the Kind FeatureStore") err = k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{}) + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{}) resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ DBPersistence: &feastdevv1alpha1.OfflineStoreDBStorePersistence{ Type: string(offlineType), diff --git a/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go b/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go index 796de8e526..a762faa5a2 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go @@ -48,6 +48,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { const resourceName = "services-ephemeral" const offlineType = "duckdb" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -65,7 +66,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ diff --git a/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go b/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go index 4930f3fc59..57dd3a290d 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go @@ -48,6 +48,7 @@ var _ = Describe("FeatureStore Controller-Kubernetes authorization", func() { Context("When deploying a resource with all ephemeral services and Kubernetes authorization", func() { const resourceName = "kubernetes-authorization" var pullPolicy = corev1.PullAlways + var replicas = int32(1) ctx := context.Background() @@ -62,7 +63,7 @@ var _ = Describe("FeatureStore Controller-Kubernetes authorization", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{}) + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{}) resource.Spec.AuthzConfig = &feastdevv1alpha1.AuthzConfig{KubernetesAuthz: &feastdevv1alpha1.KubernetesAuthz{ Roles: roles, }} diff --git a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go index db07418c92..f4a21a28f1 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go @@ -46,6 +46,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { Context("When deploying a resource with all ephemeral services", func() { const resourceName = "services-object-store" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -67,7 +68,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) resource.Spec.Services.OnlineStore = nil resource.Spec.Services.OfflineStore = nil diff --git a/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go b/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go index c062a573df..eb320c5bb3 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go @@ -49,6 +49,7 @@ var _ = Describe("FeatureStore Controller-OIDC authorization", func() { const resourceName = "oidc-authorization" const oidcSecretName = "oidc-secret" var pullPolicy = corev1.PullAlways + var replicas = int32(1) ctx := context.Background() @@ -73,7 +74,7 @@ var _ = Describe("FeatureStore Controller-OIDC authorization", func() { By("creating the custom resource for the Kind FeatureStore") err = k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{}) + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{}) resource.Spec.AuthzConfig = &feastdevv1alpha1.AuthzConfig{OidcAuthz: &feastdevv1alpha1.OidcAuthz{ SecretRef: corev1.LocalObjectReference{ Name: oidcSecretName, diff --git a/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go b/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go index d0adc62c7c..fe0caa38e6 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go @@ -50,6 +50,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { Context("When deploying a resource with all ephemeral services", func() { const resourceName = "services-pvc" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -77,7 +78,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index 44c81eca59..debd63300b 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -404,6 +404,7 @@ var _ = Describe("FeatureStore Controller", func() { Context("When reconciling a resource with all services enabled", func() { const resourceName = "services" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -419,7 +420,7 @@ var _ = Describe("FeatureStore Controller", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) Expect(k8sClient.Create(ctx, resource)).To(Succeed()) } @@ -870,6 +871,114 @@ var _ = Describe("FeatureStore Controller", func() { Expect(areEnvVarArraysEqual(deploy.Spec.Template.Spec.Containers[0].Env, []corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue + "1"}, {Name: services.FeatureStoreYamlEnvVar, Value: fsYamlStr}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.name"}}}})).To(BeTrue()) }) + It("Should scale online/offline store service", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + req, err := labels.NewRequirement(services.NameLabelKey, selection.Equals, []string{resource.Name}) + Expect(err).NotTo(HaveOccurred()) + labelSelector := labels.NewSelector().Add(*req) + listOpts := &client.ListOptions{Namespace: resource.Namespace, LabelSelector: labelSelector} + deployList := appsv1.DeploymentList{} + err = k8sClient.List(ctx, &deployList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(deployList.Items).To(HaveLen(3)) + + svcList := corev1.ServiceList{} + err = k8sClient.List(ctx, &svcList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(svcList.Items).To(HaveLen(3)) + + cmList := corev1.ConfigMapList{} + err = k8sClient.List(ctx, &cmList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(cmList.Items).To(HaveLen(1)) + + feast := services.FeastServices{ + Handler: handler.FeastHandler{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + }, + } + + fsYamlStr := "" + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OnlineFeastType) + Expect(err).NotTo(HaveOccurred()) + + // check online config + deploy_online := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy_online) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy_online.Spec.Template.Spec.ServiceAccountName).To(Equal(deploy_online.Name)) + Expect(deploy_online.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy_online.Spec.Template.Spec.Containers[0].Env).To(HaveLen(3)) + Expect(areEnvVarArraysEqual(deploy_online.Spec.Template.Spec.Containers[0].Env, []corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: services.FeatureStoreYamlEnvVar, Value: fsYamlStr}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}})).To(BeTrue()) + Expect(deploy_online.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullAlways)) + + // check offline config + deploy_offline := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy_offline) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy_offline.Spec.Template.Spec.ServiceAccountName).To(Equal(deploy_offline.Name)) + Expect(deploy_offline.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy_offline.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + Expect(deploy_offline.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullIfNotPresent)) + + // change feast project and reconcile + // scale online replicas to 2 + resourceNew := resource.DeepCopy() + new_replicas := int32(2) + resourceNew.Spec.Services.OnlineStore.Replicas = &new_replicas + resourceNew.Spec.Services.OfflineStore.Replicas = &new_replicas + + err = k8sClient.Update(ctx, resourceNew) + Expect(err).NotTo(HaveOccurred()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy_online) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy_offline) + Expect(err).NotTo(HaveOccurred()) + + Expect(deploy_online.Spec.Replicas).To(Equal(&new_replicas)) + Expect(deploy_offline.Spec.Replicas).To(Equal(&new_replicas)) + }) + It("Should delete k8s objects owned by the FeatureStore CR", func() { By("changing which feast services are configured in the CR") controllerReconciler := &FeatureStoreReconciler{ @@ -1253,7 +1362,7 @@ var _ = Describe("FeatureStore Controller", func() { }) }) -func createFeatureStoreResource(resourceName string, image string, pullPolicy corev1.PullPolicy, envVars *[]corev1.EnvVar) *feastdevv1alpha1.FeatureStore { +func createFeatureStoreResource(resourceName string, image string, pullPolicy corev1.PullPolicy, replicas int32, envVars *[]corev1.EnvVar) *feastdevv1alpha1.FeatureStore { return &feastdevv1alpha1.FeatureStore{ ObjectMeta: metav1.ObjectMeta{ Name: resourceName, @@ -1264,14 +1373,17 @@ func createFeatureStoreResource(resourceName string, image string, pullPolicy co Services: &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{}, OnlineStore: &feastdevv1alpha1.OnlineStore{ - ServiceConfigs: feastdevv1alpha1.ServiceConfigs{ - DefaultConfigs: feastdevv1alpha1.DefaultConfigs{ - Image: &image, - }, - OptionalConfigs: feastdevv1alpha1.OptionalConfigs{ - Env: envVars, - ImagePullPolicy: &pullPolicy, - Resources: &corev1.ResourceRequirements{}, + StoreServiceConfigs: feastdevv1alpha1.StoreServiceConfigs{ + Replicas: &replicas, + ServiceConfigs: feastdevv1alpha1.ServiceConfigs{ + DefaultConfigs: feastdevv1alpha1.DefaultConfigs{ + Image: &image, + }, + OptionalConfigs: feastdevv1alpha1.OptionalConfigs{ + Env: envVars, + ImagePullPolicy: &pullPolicy, + Resources: &corev1.ResourceRequirements{}, + }, }, }, }, diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 60aabebe02..0f18cc5522 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -293,7 +293,7 @@ func (feast *FeastServices) setDeployment(deploy *appsv1.Deployment, feastType F probeHandler := getProbeHandler(feastType, tls) deploy.Spec = appsv1.DeploymentSpec{ - Replicas: &DefaultReplicas, + Replicas: feast.getServiceReplicas(feastType), Selector: metav1.SetAsLabelSelector(deploy.GetLabels()), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -485,6 +485,21 @@ func (feast *FeastServices) getServiceConfigs(feastType FeastServiceType) feastd return feastdevv1alpha1.ServiceConfigs{} } +func (feast *FeastServices) getServiceReplicas(feastType FeastServiceType) *int32 { + appliedServices := feast.Handler.FeatureStore.Status.Applied.Services + switch feastType { + case OfflineFeastType: + if feast.isOfflinStore() { + return appliedServices.OfflineStore.Replicas + } + case OnlineFeastType: + if feast.isOnlinStore() { + return appliedServices.OnlineStore.Replicas + } + } + return &DefaultReplicas +} + func (feast *FeastServices) getLogLevelForType(feastType FeastServiceType) *string { services := feast.Handler.FeatureStore.Status.Applied.Services switch feastType { diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 85bd02e653..631709d6ba 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -122,7 +122,7 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { } } - setServiceDefaultConfigs(&services.OfflineStore.ServiceConfigs.DefaultConfigs) + setStoreServiceDefaultConfigs(&services.OfflineStore.StoreServiceConfigs) } if services.OnlineStore != nil { @@ -147,7 +147,7 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { } } - setServiceDefaultConfigs(&services.OnlineStore.ServiceConfigs.DefaultConfigs) + setStoreServiceDefaultConfigs(&services.OnlineStore.StoreServiceConfigs) } // overwrite status.applied with every reconcile applied.DeepCopyInto(&cr.Status.Applied) @@ -159,6 +159,13 @@ func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { } } +func setStoreServiceDefaultConfigs(storeServiceConfigs *feastdevv1alpha1.StoreServiceConfigs) { + if storeServiceConfigs.Replicas == nil { + storeServiceConfigs.Replicas = &DefaultReplicas + } + setServiceDefaultConfigs(&storeServiceConfigs.ServiceConfigs.DefaultConfigs) +} + func checkOfflineStoreFilePersistenceType(value string) error { if slices.Contains(feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes, value) { return nil