From d64da1aa9f70640e853bdb3351f98c6ca5b9366a Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Wed, 6 Nov 2024 15:11:36 +0100 Subject: [PATCH 01/11] File persistence definition and implementation Signed-off-by: Daniele Martinoli --- .../api/v1alpha1/featurestore_types.go | 44 ++++ .../api/v1alpha1/zz_generated.deepcopy.go | 120 ++++++++++ .../crd/bases/feast.dev_featurestores.yaml | 80 +++++++ ...a1_featurestore_ephemeral_persistence.yaml | 20 ++ infra/feast-operator/dist/install.yaml | 80 +++++++ infra/feast-operator/go.mod | 4 +- .../controller/featurestore_controller.go | 39 +--- .../featurestore_controller_test.go | 6 +- .../controller/services/repo_config.go | 53 +++-- .../controller/services/repo_config_test.go | 219 ++++++++++++++++++ .../internal/controller/services/services.go | 3 +- .../controller/services/services_types.go | 17 +- .../controller/services/suite_test.go | 90 +++++++ .../internal/controller/services/util.go | 94 ++++++++ 14 files changed, 809 insertions(+), 60 deletions(-) create mode 100644 infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml create mode 100644 infra/feast-operator/internal/controller/services/repo_config_test.go create mode 100644 infra/feast-operator/internal/controller/services/suite_test.go create mode 100644 infra/feast-operator/internal/controller/services/util.go diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 87e1cd64841..bfc113578f3 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -71,16 +71,60 @@ type FeatureStoreServices struct { // OfflineStore configures the deployed offline store service type OfflineStore struct { ServiceConfigs `json:",inline"` + // +optional + Persistence *OfflineStorePersistence `json:"persistence,omitempty"` +} + +// OfflineStorePersistence configures the persistence settings for the offline store service +type OfflineStorePersistence struct { + // +optional + FilePersistence *OfflineStoreFilePersistence `json:"file,omitempty"` +} + +// OfflineStorePersistence configures the file-based persistence for the offline store service +type OfflineStoreFilePersistence struct { + // +optional + // +default:value=dask + // +kubebuilder:validation:Enum=dask;duckdb + Type string `json:"type,omitempty"` } // OnlineStore configures the deployed online store service type OnlineStore struct { ServiceConfigs `json:",inline"` + // +optional + Persistence *OnlineStorePersistence `json:"persistence,omitempty"` +} + +// OnlineStorePersistence configures the persistence settings for the online store service +type OnlineStorePersistence struct { + // +optional + FilePersistence *OnlineStoreFilePersistence `json:"file,omitempty"` +} + +// OnlineStoreFilePersistence configures the file-based persistence for the offline store service +type OnlineStoreFilePersistence struct { + // +optional + Path string `json:"path,omitempty"` } // LocalRegistryConfig configures the deployed registry service type LocalRegistryConfig struct { ServiceConfigs `json:",inline"` + // +optional + Persistence *RegistryPersistence `json:"persistence,omitempty"` +} + +// RegistryPersistence configures the persistence settings for the registry service +type RegistryPersistence struct { + // +optional + FilePersistence *RegistryFilePersistence `json:"file,omitempty"` +} + +// RegistryFilePersistence configures the file-based persistence for the registry service +type RegistryFilePersistence struct { + // +optional + Path string `json:"path,omitempty"` } // Registry configures the registry service. One selection is required. Local is the default setting. diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index f37c8942ad2..6ff9957bb84 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -198,6 +198,11 @@ func (in *FeatureStoreStatus) DeepCopy() *FeatureStoreStatus { func (in *LocalRegistryConfig) DeepCopyInto(out *LocalRegistryConfig) { *out = *in in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + if in.Persistence != nil { + in, out := &in.Persistence, &out.Persistence + *out = new(RegistryPersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalRegistryConfig. @@ -214,6 +219,11 @@ func (in *LocalRegistryConfig) DeepCopy() *LocalRegistryConfig { func (in *OfflineStore) DeepCopyInto(out *OfflineStore) { *out = *in in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + if in.Persistence != nil { + in, out := &in.Persistence, &out.Persistence + *out = new(OfflineStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStore. @@ -226,10 +236,50 @@ func (in *OfflineStore) DeepCopy() *OfflineStore { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OfflineStoreFilePersistence) DeepCopyInto(out *OfflineStoreFilePersistence) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStoreFilePersistence. +func (in *OfflineStoreFilePersistence) DeepCopy() *OfflineStoreFilePersistence { + if in == nil { + return nil + } + out := new(OfflineStoreFilePersistence) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OfflineStorePersistence) DeepCopyInto(out *OfflineStorePersistence) { + *out = *in + if in.FilePersistence != nil { + in, out := &in.FilePersistence, &out.FilePersistence + *out = new(OfflineStoreFilePersistence) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStorePersistence. +func (in *OfflineStorePersistence) DeepCopy() *OfflineStorePersistence { + if in == nil { + return nil + } + out := new(OfflineStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OnlineStore) DeepCopyInto(out *OnlineStore) { *out = *in in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + if in.Persistence != nil { + in, out := &in.Persistence, &out.Persistence + *out = new(OnlineStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStore. @@ -242,6 +292,41 @@ func (in *OnlineStore) DeepCopy() *OnlineStore { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OnlineStoreFilePersistence) DeepCopyInto(out *OnlineStoreFilePersistence) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStoreFilePersistence. +func (in *OnlineStoreFilePersistence) DeepCopy() *OnlineStoreFilePersistence { + if in == nil { + return nil + } + out := new(OnlineStoreFilePersistence) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OnlineStorePersistence) DeepCopyInto(out *OnlineStorePersistence) { + *out = *in + if in.FilePersistence != nil { + in, out := &in.FilePersistence, &out.FilePersistence + *out = new(OnlineStoreFilePersistence) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStorePersistence. +func (in *OnlineStorePersistence) DeepCopy() *OnlineStorePersistence { + if in == nil { + return nil + } + out := new(OnlineStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OptionalConfigs) DeepCopyInto(out *OptionalConfigs) { *out = *in @@ -303,6 +388,41 @@ func (in *Registry) DeepCopy() *Registry { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegistryFilePersistence) DeepCopyInto(out *RegistryFilePersistence) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryFilePersistence. +func (in *RegistryFilePersistence) DeepCopy() *RegistryFilePersistence { + if in == nil { + return nil + } + out := new(RegistryFilePersistence) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegistryPersistence) DeepCopyInto(out *RegistryPersistence) { + *out = *in + if in.FilePersistence != nil { + in, out := &in.FilePersistence, &out.FilePersistence + *out = new(RegistryFilePersistence) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryPersistence. +func (in *RegistryPersistence) DeepCopy() *RegistryPersistence { + if in == nil { + return nil + } + out := new(RegistryPersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RemoteRegistryConfig) DeepCopyInto(out *RemoteRegistryConfig) { *out = *in diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index b4c17b5eb80..a6d4551da5c 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -181,6 +181,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the file-based + persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -361,6 +376,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures the + file-based persistence for the offline store service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -546,6 +573,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures the + file-based persistence for the registry service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -780,6 +819,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the + file-based persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -962,6 +1016,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures + the file-based persistence for the offline store + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -1151,6 +1218,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures + the file-based persistence for the registry + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml b/infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml new file mode 100644 index 00000000000..512fed9d4c0 --- /dev/null +++ b/infra/feast-operator/config/samples/v1alpha1_featurestore_ephemeral_persistence.yaml @@ -0,0 +1,20 @@ +apiVersion: feast.dev/v1alpha1 +kind: FeatureStore +metadata: + name: sample-ephemeral-persistence +spec: + feastProject: my_project + services: + onlineStore: + persistence: + file: + path: /data/online_store.db + offlineStore: + persistence: + file: + type: dask + registry: + local: + persistence: + file: + path: /data/registry.db diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index 4d66fbdc734..ceb6b34e5c4 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -189,6 +189,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the file-based + persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -369,6 +384,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures the + file-based persistence for the offline store service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -554,6 +581,18 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures the + file-based persistence for the registry service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -788,6 +827,21 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OfflineStorePersistence configures the persistence + settings for the offline store service + properties: + file: + description: OfflineStorePersistence configures the + file-based persistence for the offline store service + properties: + type: + enum: + - dask + - duckdb + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -970,6 +1024,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: OnlineStorePersistence configures the persistence + settings for the online store service + properties: + file: + description: OnlineStoreFilePersistence configures + the file-based persistence for the offline store + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. @@ -1159,6 +1226,19 @@ spec: description: PullPolicy describes a policy for if/when to pull a container image type: string + persistence: + description: RegistryPersistence configures the persistence + settings for the registry service + properties: + file: + description: RegistryFilePersistence configures + the file-based persistence for the registry + service + properties: + path: + type: string + type: object + type: object resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/go.mod b/infra/feast-operator/go.mod index 65d2aaac502..4e544d819e4 100644 --- a/infra/feast-operator/go.mod +++ b/infra/feast-operator/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 + gopkg.in/yaml.v3 v3.0.1 + k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/client-go v0.29.2 sigs.k8s.io/controller-runtime v0.17.3 @@ -60,8 +62,6 @@ require ( google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.2 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect k8s.io/klog/v2 v2.110.1 // indirect diff --git a/infra/feast-operator/internal/controller/featurestore_controller.go b/infra/feast-operator/internal/controller/featurestore_controller.go index 244bbcaae80..66e50d16367 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller.go +++ b/infra/feast-operator/internal/controller/featurestore_controller.go @@ -34,7 +34,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" "github.com/feast-dev/feast/infra/feast-operator/internal/controller/services" ) @@ -78,7 +77,7 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request currentStatus := cr.Status.DeepCopy() // initial status defaults must occur before feast deployment - applyDefaultsToStatus(cr) + services.ApplyDefaultsToStatus(cr) result, recErr = r.deployFeast(ctx, cr) if cr.DeletionTimestamp == nil && !reflect.DeepEqual(currentStatus, cr.Status) { if err := r.Client.Status().Update(ctx, cr); err != nil { @@ -184,39 +183,3 @@ func (r *FeatureStoreReconciler) mapFeastRefsToFeastRequests(ctx context.Context return requests } - -func applyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { - cr.Status.FeastVersion = feastversion.FeastVersion - applied := cr.Spec.DeepCopy() - if applied.Services == nil { - applied.Services = &feastdevv1alpha1.FeatureStoreServices{} - } - - // default to registry service deployment - if applied.Services.Registry == nil { - applied.Services.Registry = &feastdevv1alpha1.Registry{} - } - // if remote registry not set, proceed w/ local registry defaults - if applied.Services.Registry.Remote == nil { - // if local registry not set, apply an empty pointer struct - if applied.Services.Registry.Local == nil { - applied.Services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{} - } - setServiceDefaultConfigs(&applied.Services.Registry.Local.ServiceConfigs.DefaultConfigs) - } - if applied.Services.OfflineStore != nil { - setServiceDefaultConfigs(&applied.Services.OfflineStore.ServiceConfigs.DefaultConfigs) - } - if applied.Services.OnlineStore != nil { - setServiceDefaultConfigs(&applied.Services.OnlineStore.ServiceConfigs.DefaultConfigs) - } - - // overwrite status.applied with every reconcile - applied.DeepCopyInto(&cr.Status.Applied) -} - -func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { - if defaultConfigs.Image == nil { - defaultConfigs.Image = &services.DefaultImage - } -} diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index 10b5f64c567..d6ecb0fa577 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -233,7 +233,7 @@ var _ = Describe("FeatureStore Controller", func() { EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, Registry: services.RegistryConfig{ RegistryType: services.RegistryFileConfigType, - Path: services.LocalRegistryPath, + Path: services.DefaultRegistryPath, }, } Expect(repoConfig).To(Equal(testConfig)) @@ -590,7 +590,7 @@ var _ = Describe("FeatureStore Controller", func() { EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, Registry: services.RegistryConfig{ RegistryType: services.RegistryFileConfigType, - Path: services.LocalRegistryPath, + Path: services.DefaultRegistryPath, }, } Expect(repoConfig).To(Equal(testConfig)) @@ -666,7 +666,7 @@ var _ = Describe("FeatureStore Controller", func() { EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, OfflineStore: offlineRemote, OnlineStore: services.OnlineStoreConfig{ - Path: services.LocalOnlinePath, + Path: services.DefaultOnlinePath, Type: services.OnlineSqliteConfigType, }, Registry: regRemote, diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 3137417f3ac..5dba4981985 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -35,48 +35,75 @@ func (feast *FeastServices) GetServiceFeatureStoreYamlBase64(feastType FeastServ } func (feast *FeastServices) getServiceFeatureStoreYaml(feastType FeastServiceType) ([]byte, error) { - return yaml.Marshal(feast.getServiceRepoConfig(feastType)) + repoConfig, err := feast.getServiceRepoConfig(feastType) + if err != nil { + return nil, err + } + return yaml.Marshal(repoConfig) } -func (feast *FeastServices) getServiceRepoConfig(feastType FeastServiceType) RepoConfig { - appliedSpec := feast.FeatureStore.Status.Applied +func (feast *FeastServices) getServiceRepoConfig(feastType FeastServiceType) (RepoConfig, error) { + return getServiceRepoConfig(feastType, feast.FeatureStore) +} - repoConfig := feast.getClientRepoConfig() +func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1alpha1.FeatureStore) (RepoConfig, error) { + appliedSpec := featureStore.Status.Applied + + repoConfig := getClientRepoConfig(featureStore) + isLocalRegistry := IsLocalRegistry(featureStore) if appliedSpec.Services != nil { // Offline server has an `offline_store` section and a remote `registry` if feastType == OfflineFeastType && appliedSpec.Services.OfflineStore != nil { - repoConfig.OfflineStore = OfflineStoreConfig{ - Type: OfflineDaskConfigType, + fileType := string(OfflineDaskConfigType) + if appliedSpec.Services.OfflineStore.Persistence != nil && appliedSpec.Services.OfflineStore.Persistence.FilePersistence != nil { + fileType = appliedSpec.Services.OfflineStore.Persistence.FilePersistence.Type + } + + repoConfig.OfflineStore = OfflineStoreConfig{} + var err error + repoConfig.OfflineStore.Type, err = ParseOfflineConfigType(fileType) + if err != nil { + return repoConfig, err } repoConfig.OnlineStore = OnlineStoreConfig{} } // Online server has an `online_store` section, a remote `registry` and a remote `offline_store` if feastType == OnlineFeastType && appliedSpec.Services.OnlineStore != nil { + path := DefaultOnlinePath + if appliedSpec.Services.OnlineStore.Persistence != nil && appliedSpec.Services.OnlineStore.Persistence.FilePersistence != nil { + path = appliedSpec.Services.OnlineStore.Persistence.FilePersistence.Path + } + repoConfig.OnlineStore = OnlineStoreConfig{ Type: OnlineSqliteConfigType, - Path: LocalOnlinePath, + Path: path, } } // Registry server only has a `registry` section - if feastType == RegistryFeastType && feast.isLocalRegistry() { + if feastType == RegistryFeastType && isLocalRegistry { + path := DefaultRegistryPath + if appliedSpec.Services != nil && appliedSpec.Services.Registry != nil && appliedSpec.Services.Registry.Local != nil && + appliedSpec.Services.Registry.Local.Persistence != nil && appliedSpec.Services.Registry.Local.Persistence.FilePersistence != nil { + path = appliedSpec.Services.Registry.Local.Persistence.FilePersistence.Path + } repoConfig.Registry = RegistryConfig{ RegistryType: RegistryFileConfigType, - Path: LocalRegistryPath, + Path: path, } repoConfig.OfflineStore = OfflineStoreConfig{} repoConfig.OnlineStore = OnlineStoreConfig{} } } - return repoConfig + return repoConfig, nil } func (feast *FeastServices) getClientFeatureStoreYaml() ([]byte, error) { - return yaml.Marshal(feast.getClientRepoConfig()) + return yaml.Marshal(getClientRepoConfig(feast.FeatureStore)) } -func (feast *FeastServices) getClientRepoConfig() RepoConfig { - status := feast.FeatureStore.Status +func getClientRepoConfig(featureStore *feastdevv1alpha1.FeatureStore) RepoConfig { + status := featureStore.Status clientRepoConfig := RepoConfig{ Project: status.Applied.FeastProject, Provider: LocalProviderType, diff --git a/infra/feast-operator/internal/controller/services/repo_config_test.go b/infra/feast-operator/internal/controller/services/repo_config_test.go new file mode 100644 index 00000000000..6c388033101 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/repo_config_test.go @@ -0,0 +1,219 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" +) + +var projectName = "test-project" + +var _ = Describe("Repo Config", func() { + Context("When creating the RepoConfig of a FeatureStore", func() { + + It("should successfully create the repo configs", func() { + By("Having the minimal created resource") + featureStore := minimalFeatureStore() + ApplyDefaultsToStatus(featureStore) + repoConfig, err := getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig := RegistryConfig{ + RegistryType: "file", + Path: DefaultRegistryPath, + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + + By("Having the local registry resource") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: "file.db", + }, + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig = RegistryConfig{ + RegistryType: "file", + Path: "file.db", + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + + By("Having the remote registry resource") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Remote: &feastdevv1alpha1.RemoteRegistryConfig{ + FeastRef: &feastdevv1alpha1.FeatureStoreRef{ + Name: "registry", + Namespace: "remoteNS", + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + By("Having the all the services") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + OfflineStore: &feastdevv1alpha1.OfflineStore{ + Persistence: &feastdevv1alpha1.OfflineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: "duckdb", + }, + }, + }, + OnlineStore: &feastdevv1alpha1.OnlineStore{ + Persistence: &feastdevv1alpha1.OnlineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: "/data/online.db", + }, + }, + }, + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: "/data/registry.db", + }, + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + expectedOfflineConfig := OfflineStoreConfig{ + Type: "duckdb", + } + Expect(repoConfig.OfflineStore).To(Equal(expectedOfflineConfig)) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + expectedOnlineConfig := OnlineStoreConfig{ + Type: "sqlite", + Path: "/data/online.db", + } + Expect(repoConfig.OnlineStore).To(Equal(expectedOnlineConfig)) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig = RegistryConfig{ + RegistryType: "file", + Path: "/data/registry.db", + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + }) + + It("should fail the creation of repo configs", func() { + By("Having wrong offline persistence type") + featureStore := minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + OfflineStore: &feastdevv1alpha1.OfflineStore{ + Persistence: &feastdevv1alpha1.OfflineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: "invalid", + }, + }, + }, + } + ApplyDefaultsToStatus(featureStore) + _, err := getServiceRepoConfig(OfflineFeastType, featureStore) + Expect(err).To(HaveOccurred()) + }) + }) +}) + +func emptyOnlineStoreConfig() OnlineStoreConfig { + return OnlineStoreConfig{} +} + +func emptyOfflineStoreConfig() OfflineStoreConfig { + return OfflineStoreConfig{} +} + +func emptyRegistryConfig() RegistryConfig { + return RegistryConfig{} +} + +func minimalFeatureStore() *feastdevv1alpha1.FeatureStore { + return &feastdevv1alpha1.FeatureStore{ + Spec: feastdevv1alpha1.FeatureStoreSpec{ + FeastProject: projectName, + }, + } +} diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 5e1778322f3..7acdc564bd3 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -329,8 +329,7 @@ func (feast *FeastServices) setRemoteRegistryURL() error { } func (feast *FeastServices) isLocalRegistry() bool { - appliedServices := feast.FeatureStore.Status.Applied.Services - return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil + return IsLocalRegistry(feast.FeatureStore) } func (feast *FeastServices) isRemoteRegistry() bool { diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index c2348666179..4e90641838f 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -18,6 +18,7 @@ package services import ( "context" + "fmt" "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" @@ -30,8 +31,8 @@ const ( FeastPrefix = "feast-" FeatureStoreYamlEnvVar = "FEATURE_STORE_YAML_BASE64" FeatureStoreYamlCmKey = "feature_store.yaml" - LocalRegistryPath = "/tmp/registry.db" - LocalOnlinePath = "/tmp/online_store.db" + DefaultRegistryPath = "/tmp/registry.db" + DefaultOnlinePath = "/tmp/online_store.db" svcDomain = ".svc.cluster.local" HttpPort = 80 @@ -42,6 +43,7 @@ const ( OfflineRemoteConfigType OfflineConfigType = "remote" OfflineDaskConfigType OfflineConfigType = "dask" + OfflineDuckDbConfigType OfflineConfigType = "duckdb" OnlineRemoteConfigType OnlineConfigType = "remote" OnlineSqliteConfigType OnlineConfigType = "sqlite" @@ -135,6 +137,17 @@ type FeastServiceType string // OfflineConfigType provider name or a class name that implements Offline Store type OfflineConfigType string +func ParseOfflineConfigType(value string) (OfflineConfigType, error) { + switch value { + case string(OfflineDaskConfigType): + return OfflineDaskConfigType, nil + case string(OfflineDuckDbConfigType): + return OfflineDuckDbConfigType, nil + default: + return "", fmt.Errorf("invalid OfflineConfigType value %s", value) + } +} + // RegistryConfigType provider name or a class name that implements Registry type RegistryConfigType string diff --git a/infra/feast-operator/internal/controller/services/suite_test.go b/infra/feast-operator/internal/controller/services/suite_test.go new file mode 100644 index 00000000000..d9f3760bcf5 --- /dev/null +++ b/infra/feast-operator/internal/controller/services/suite_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package services + +import ( + "fmt" + "path/filepath" + "runtime" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + //+kubebuilder:scaffold:imports +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment + +func TestServices(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Controller Services Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + + // The BinaryAssetsDirectory is only required if you want to run the tests directly + // without call the makefile target test. If not informed it will look for the + // default path defined in controller-runtime which is /usr/local/kubebuilder/. + // Note that you must have the required binaries setup under the bin directory to perform + // the tests directly. When we run make test it will be setup and used automatically. + BinaryAssetsDirectory: filepath.Join("..", "..", "..", "bin", "k8s", + fmt.Sprintf("1.29.0-%s-%s", runtime.GOOS, runtime.GOARCH)), + } + + var err error + // cfg is defined in this file globally. + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + err = feastdevv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + //+kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go new file mode 100644 index 00000000000..59a69b4417e --- /dev/null +++ b/infra/feast-operator/internal/controller/services/util.go @@ -0,0 +1,94 @@ +package services + +import ( + "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" +) + +func IsLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { + appliedServices := featureStore.Status.Applied.Services + return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil +} + +func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { + cr.Status.FeastVersion = feastversion.FeastVersion + applied := cr.Spec.DeepCopy() + if applied.Services == nil { + applied.Services = &feastdevv1alpha1.FeatureStoreServices{} + } + + // default to registry service deployment + if applied.Services.Registry == nil { + applied.Services.Registry = &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: DefaultRegistryPath, + }, + }, + }, + } + } + // if remote registry not set, proceed w/ local registry defaults + if applied.Services.Registry.Remote == nil { + // if local registry not set, apply an empty pointer struct + if applied.Services.Registry.Local == nil { + applied.Services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: DefaultRegistryPath, + }, + }, + } + } + if applied.Services.Registry.Local.Persistence == nil { + applied.Services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: DefaultRegistryPath, + }, + } + } else if applied.Services.Registry.Local.Persistence.FilePersistence == nil { + applied.Services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{ + Path: DefaultRegistryPath, + } + } + setServiceDefaultConfigs(&applied.Services.Registry.Local.ServiceConfigs.DefaultConfigs) + } + if applied.Services.OfflineStore != nil { + setServiceDefaultConfigs(&applied.Services.OfflineStore.ServiceConfigs.DefaultConfigs) + if applied.Services.OfflineStore.Persistence == nil { + applied.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: string(OfflineDaskConfigType), + }, + } + } else if applied.Services.OfflineStore.Persistence.FilePersistence == nil { + applied.Services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: string(OfflineDaskConfigType), + } + } + } + if applied.Services.OnlineStore != nil { + setServiceDefaultConfigs(&applied.Services.OnlineStore.ServiceConfigs.DefaultConfigs) + if applied.Services.OnlineStore.Persistence == nil { + applied.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: DefaultOnlinePath, + }, + } + } else if applied.Services.OnlineStore.Persistence.FilePersistence == nil { + applied.Services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: DefaultOnlinePath, + } + } + } + + // overwrite status.applied with every reconcile + applied.DeepCopyInto(&cr.Status.Applied) +} + +func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { + if defaultConfigs.Image == nil { + defaultConfigs.Image = &DefaultImage + } +} From 08b101125a5f9240369ae2dd47832b030ea963db Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Wed, 6 Nov 2024 16:35:30 +0100 Subject: [PATCH 02/11] removed optional and default markers Signed-off-by: Daniele Martinoli --- .../api/v1alpha1/featurestore_types.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index bfc113578f3..5bf0e985a12 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -71,20 +71,16 @@ type FeatureStoreServices struct { // OfflineStore configures the deployed offline store service type OfflineStore struct { ServiceConfigs `json:",inline"` - // +optional - Persistence *OfflineStorePersistence `json:"persistence,omitempty"` + Persistence *OfflineStorePersistence `json:"persistence,omitempty"` } // OfflineStorePersistence configures the persistence settings for the offline store service type OfflineStorePersistence struct { - // +optional FilePersistence *OfflineStoreFilePersistence `json:"file,omitempty"` } // OfflineStorePersistence configures the file-based persistence for the offline store service type OfflineStoreFilePersistence struct { - // +optional - // +default:value=dask // +kubebuilder:validation:Enum=dask;duckdb Type string `json:"type,omitempty"` } @@ -92,38 +88,32 @@ type OfflineStoreFilePersistence struct { // OnlineStore configures the deployed online store service type OnlineStore struct { ServiceConfigs `json:",inline"` - // +optional - Persistence *OnlineStorePersistence `json:"persistence,omitempty"` + Persistence *OnlineStorePersistence `json:"persistence,omitempty"` } // OnlineStorePersistence configures the persistence settings for the online store service type OnlineStorePersistence struct { - // +optional FilePersistence *OnlineStoreFilePersistence `json:"file,omitempty"` } // OnlineStoreFilePersistence configures the file-based persistence for the offline store service type OnlineStoreFilePersistence struct { - // +optional Path string `json:"path,omitempty"` } // LocalRegistryConfig configures the deployed registry service type LocalRegistryConfig struct { ServiceConfigs `json:",inline"` - // +optional - Persistence *RegistryPersistence `json:"persistence,omitempty"` + Persistence *RegistryPersistence `json:"persistence,omitempty"` } // RegistryPersistence configures the persistence settings for the registry service type RegistryPersistence struct { - // +optional FilePersistence *RegistryFilePersistence `json:"file,omitempty"` } // RegistryFilePersistence configures the file-based persistence for the registry service type RegistryFilePersistence struct { - // +optional Path string `json:"path,omitempty"` } From 9b075069b15d71914dcebbf7054affc2d3d02e7a Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Wed, 6 Nov 2024 16:42:08 +0100 Subject: [PATCH 03/11] removed global cfg variable Signed-off-by: Daniele Martinoli --- .../internal/controller/services/suite_test.go | 6 +----- infra/feast-operator/internal/controller/suite_test.go | 5 +---- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/infra/feast-operator/internal/controller/services/suite_test.go b/infra/feast-operator/internal/controller/services/suite_test.go index d9f3760bcf5..5f76f5e6ff7 100644 --- a/infra/feast-operator/internal/controller/services/suite_test.go +++ b/infra/feast-operator/internal/controller/services/suite_test.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -39,7 +38,6 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment @@ -66,9 +64,7 @@ var _ = BeforeSuite(func() { fmt.Sprintf("1.29.0-%s-%s", runtime.GOOS, runtime.GOARCH)), } - var err error - // cfg is defined in this file globally. - cfg, err = testEnv.Start() + cfg, err := testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) diff --git a/infra/feast-operator/internal/controller/suite_test.go b/infra/feast-operator/internal/controller/suite_test.go index 57091df5c00..8245052b268 100644 --- a/infra/feast-operator/internal/controller/suite_test.go +++ b/infra/feast-operator/internal/controller/suite_test.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -39,7 +38,6 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment @@ -67,8 +65,7 @@ var _ = BeforeSuite(func() { } var err error - // cfg is defined in this file globally. - cfg, err = testEnv.Start() + cfg, err := testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) From db3c5c53d78f2c55a3511b1b38501b53f4aede8f Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Wed, 6 Nov 2024 17:38:14 +0100 Subject: [PATCH 04/11] reviewed ApplyDefaultsToStatus Signed-off-by: Daniele Martinoli --- .../internal/controller/services/util.go | 45 +++++-------------- 1 file changed, 11 insertions(+), 34 deletions(-) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 59a69b4417e..16b7b68db73 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -19,35 +19,18 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { // default to registry service deployment if applied.Services.Registry == nil { - applied.Services.Registry = &feastdevv1alpha1.Registry{ - Local: &feastdevv1alpha1.LocalRegistryConfig{ - Persistence: &feastdevv1alpha1.RegistryPersistence{ - FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ - Path: DefaultRegistryPath, - }, - }, - }, - } + applied.Services.Registry = &feastdevv1alpha1.Registry{} } // if remote registry not set, proceed w/ local registry defaults if applied.Services.Registry.Remote == nil { // if local registry not set, apply an empty pointer struct if applied.Services.Registry.Local == nil { - applied.Services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{ - Persistence: &feastdevv1alpha1.RegistryPersistence{ - FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ - Path: DefaultRegistryPath, - }, - }, - } + applied.Services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{} } if applied.Services.Registry.Local.Persistence == nil { - applied.Services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{ - FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ - Path: DefaultRegistryPath, - }, - } - } else if applied.Services.Registry.Local.Persistence.FilePersistence == nil { + applied.Services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{} + } + if applied.Services.Registry.Local.Persistence.FilePersistence == nil { applied.Services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{ Path: DefaultRegistryPath, } @@ -57,12 +40,9 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { if applied.Services.OfflineStore != nil { setServiceDefaultConfigs(&applied.Services.OfflineStore.ServiceConfigs.DefaultConfigs) if applied.Services.OfflineStore.Persistence == nil { - applied.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ - FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ - Type: string(OfflineDaskConfigType), - }, - } - } else if applied.Services.OfflineStore.Persistence.FilePersistence == nil { + applied.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{} + } + if applied.Services.OfflineStore.Persistence.FilePersistence == nil { applied.Services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{ Type: string(OfflineDaskConfigType), } @@ -71,12 +51,9 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { if applied.Services.OnlineStore != nil { setServiceDefaultConfigs(&applied.Services.OnlineStore.ServiceConfigs.DefaultConfigs) if applied.Services.OnlineStore.Persistence == nil { - applied.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{ - FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ - Path: DefaultOnlinePath, - }, - } - } else if applied.Services.OnlineStore.Persistence.FilePersistence == nil { + applied.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{} + } + if applied.Services.OnlineStore.Persistence.FilePersistence == nil { applied.Services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{ Path: DefaultOnlinePath, } From 74c518cedf81465ce1508d76c9178effc79131dd Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Wed, 6 Nov 2024 22:08:42 +0100 Subject: [PATCH 05/11] extended checks in ApplyDefaultsToStatus Signed-off-by: Daniele Martinoli --- .../internal/controller/services/util.go | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 16b7b68db73..9fd8dd3cf69 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -31,9 +31,10 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { applied.Services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{} } if applied.Services.Registry.Local.Persistence.FilePersistence == nil { - applied.Services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{ - Path: DefaultRegistryPath, - } + applied.Services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{} + } + if len(applied.Services.Registry.Local.Persistence.FilePersistence.Path) == 0 { + applied.Services.Registry.Local.Persistence.FilePersistence.Path = DefaultRegistryPath } setServiceDefaultConfigs(&applied.Services.Registry.Local.ServiceConfigs.DefaultConfigs) } @@ -43,9 +44,10 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { applied.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{} } if applied.Services.OfflineStore.Persistence.FilePersistence == nil { - applied.Services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{ - Type: string(OfflineDaskConfigType), - } + applied.Services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{} + } + if len(applied.Services.OfflineStore.Persistence.FilePersistence.Type) == 0 { + applied.Services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineDaskConfigType) } } if applied.Services.OnlineStore != nil { @@ -54,12 +56,12 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { applied.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{} } if applied.Services.OnlineStore.Persistence.FilePersistence == nil { - applied.Services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{ - Path: DefaultOnlinePath, - } + applied.Services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{} + } + if len(applied.Services.OnlineStore.Persistence.FilePersistence.Path) == 0 { + applied.Services.OnlineStore.Persistence.FilePersistence.Path = DefaultOnlinePath } } - // overwrite status.applied with every reconcile applied.DeepCopyInto(&cr.Status.Applied) } From da6a84f0895886846af83cb6c83539486867b5ce Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Wed, 6 Nov 2024 22:12:38 +0100 Subject: [PATCH 06/11] removed var error Signed-off-by: Daniele Martinoli --- infra/feast-operator/internal/controller/suite_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/infra/feast-operator/internal/controller/suite_test.go b/infra/feast-operator/internal/controller/suite_test.go index 8245052b268..38da27cc9c5 100644 --- a/infra/feast-operator/internal/controller/suite_test.go +++ b/infra/feast-operator/internal/controller/suite_test.go @@ -64,7 +64,6 @@ var _ = BeforeSuite(func() { fmt.Sprintf("1.29.0-%s-%s", runtime.GOOS, runtime.GOARCH)), } - var err error cfg, err := testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) From bce16ca48f656a9ec6ed95115777c4fe3f098a5f Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Thu, 7 Nov 2024 10:25:30 +0100 Subject: [PATCH 07/11] added IsValidOfflineStoreFilePersistenceType to the API definitions Signed-off-by: Daniele Martinoli --- .../api/v1alpha1/featurestore_types.go | 17 +++++++++++++++++ .../controller/featurestore_controller.go | 8 ++++++-- .../controller/services/repo_config.go | 11 +++++------ .../controller/services/repo_config_test.go | 18 +++++++++++------- .../controller/services/services_types.go | 12 ------------ .../internal/controller/services/util.go | 8 +++++++- 6 files changed, 46 insertions(+), 28 deletions(-) diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 5bf0e985a12..3d946aa9483 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "fmt" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -85,6 +87,21 @@ type OfflineStoreFilePersistence struct { Type string `json:"type,omitempty"` } +var validOfflineStoreFilePersistenceType = []string{ + "dask", + "duckdb", +} + +// A function to validate the file persistence types for offline stores +func IsValidOfflineStoreFilePersistenceType(value string) (bool, error) { + for _, v := range validOfflineStoreFilePersistenceType { + if v == value { + return true, nil + } + } + return false, fmt.Errorf("invalid file type %s for offline store", value) +} + // OnlineStore configures the deployed online store service type OnlineStore struct { ServiceConfigs `json:",inline"` diff --git a/infra/feast-operator/internal/controller/featurestore_controller.go b/infra/feast-operator/internal/controller/featurestore_controller.go index 66e50d16367..6605f2d1fca 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller.go +++ b/infra/feast-operator/internal/controller/featurestore_controller.go @@ -77,10 +77,14 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request currentStatus := cr.Status.DeepCopy() // initial status defaults must occur before feast deployment - services.ApplyDefaultsToStatus(cr) + if err := services.ApplyDefaultsToStatus(cr); err != nil { + logger.Error(err, "Error updating the FeatureStore status") + result = ctrl.Result{Requeue: true, RequeueAfter: RequeueDelayError} + return result, err + } result, recErr = r.deployFeast(ctx, cr) if cr.DeletionTimestamp == nil && !reflect.DeepEqual(currentStatus, cr.Status) { - if err := r.Client.Status().Update(ctx, cr); err != nil { + if err = r.Client.Status().Update(ctx, cr); err != nil { if apierrors.IsConflict(err) { logger.Info("FeatureStore object modified, retry syncing status") // Re-queue and preserve existing recErr diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 5dba4981985..a4e94364d2d 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -55,15 +55,14 @@ func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1al // Offline server has an `offline_store` section and a remote `registry` if feastType == OfflineFeastType && appliedSpec.Services.OfflineStore != nil { fileType := string(OfflineDaskConfigType) - if appliedSpec.Services.OfflineStore.Persistence != nil && appliedSpec.Services.OfflineStore.Persistence.FilePersistence != nil { + if appliedSpec.Services.OfflineStore.Persistence != nil && + appliedSpec.Services.OfflineStore.Persistence.FilePersistence != nil && + len(appliedSpec.Services.OfflineStore.Persistence.FilePersistence.Type) > 0 { fileType = appliedSpec.Services.OfflineStore.Persistence.FilePersistence.Type } - repoConfig.OfflineStore = OfflineStoreConfig{} - var err error - repoConfig.OfflineStore.Type, err = ParseOfflineConfigType(fileType) - if err != nil { - return repoConfig, err + repoConfig.OfflineStore = OfflineStoreConfig{ + Type: OfflineConfigType(fileType), } repoConfig.OnlineStore = OnlineStoreConfig{} } diff --git a/infra/feast-operator/internal/controller/services/repo_config_test.go b/infra/feast-operator/internal/controller/services/repo_config_test.go index 6c388033101..fc2202333f5 100644 --- a/infra/feast-operator/internal/controller/services/repo_config_test.go +++ b/infra/feast-operator/internal/controller/services/repo_config_test.go @@ -31,8 +31,10 @@ var _ = Describe("Repo Config", func() { It("should successfully create the repo configs", func() { By("Having the minimal created resource") featureStore := minimalFeatureStore() - ApplyDefaultsToStatus(featureStore) - repoConfig, err := getServiceRepoConfig(OfflineFeastType, featureStore) + err := ApplyDefaultsToStatus(featureStore) + Expect(err).NotTo(HaveOccurred()) + var repoConfig RepoConfig + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) @@ -67,7 +69,8 @@ var _ = Describe("Repo Config", func() { }, }, } - ApplyDefaultsToStatus(featureStore) + err = ApplyDefaultsToStatus(featureStore) + Expect(err).NotTo(HaveOccurred()) repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) @@ -102,7 +105,8 @@ var _ = Describe("Repo Config", func() { }, }, } - ApplyDefaultsToStatus(featureStore) + err = ApplyDefaultsToStatus(featureStore) + Expect(err).NotTo(HaveOccurred()) repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) @@ -148,7 +152,8 @@ var _ = Describe("Repo Config", func() { }, }, } - ApplyDefaultsToStatus(featureStore) + err = ApplyDefaultsToStatus(featureStore) + Expect(err).NotTo(HaveOccurred()) repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) expectedOfflineConfig := OfflineStoreConfig{ @@ -191,8 +196,7 @@ var _ = Describe("Repo Config", func() { }, }, } - ApplyDefaultsToStatus(featureStore) - _, err := getServiceRepoConfig(OfflineFeastType, featureStore) + err := ApplyDefaultsToStatus(featureStore) Expect(err).To(HaveOccurred()) }) }) diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index 4e90641838f..222bf62dc0e 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -18,7 +18,6 @@ package services import ( "context" - "fmt" "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" @@ -137,17 +136,6 @@ type FeastServiceType string // OfflineConfigType provider name or a class name that implements Offline Store type OfflineConfigType string -func ParseOfflineConfigType(value string) (OfflineConfigType, error) { - switch value { - case string(OfflineDaskConfigType): - return OfflineDaskConfigType, nil - case string(OfflineDuckDbConfigType): - return OfflineDuckDbConfigType, nil - default: - return "", fmt.Errorf("invalid OfflineConfigType value %s", value) - } -} - // RegistryConfigType provider name or a class name that implements Registry type RegistryConfigType string diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 9fd8dd3cf69..af0e51c742c 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -10,7 +10,7 @@ func IsLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil } -func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { +func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) error { cr.Status.FeastVersion = feastversion.FeastVersion applied := cr.Spec.DeepCopy() if applied.Services == nil { @@ -48,6 +48,11 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { } if len(applied.Services.OfflineStore.Persistence.FilePersistence.Type) == 0 { applied.Services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineDaskConfigType) + } else { + _, err := feastdevv1alpha1.IsValidOfflineStoreFilePersistenceType(applied.Services.OfflineStore.Persistence.FilePersistence.Type) + if err != nil { + return err + } } } if applied.Services.OnlineStore != nil { @@ -64,6 +69,7 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { } // overwrite status.applied with every reconcile applied.DeepCopyInto(&cr.Status.Applied) + return nil } func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { From 251538d4e423b8bb79aac274478bd32bec9123e7 Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Thu, 7 Nov 2024 14:48:58 +0100 Subject: [PATCH 08/11] moved IsValidOfflineStoreFilePersistenceType to services package and use it in deploy flow Signed-off-by: Daniele Martinoli --- .../api/v1alpha1/featurestore_types.go | 14 +--- .../controller/featurestore_controller.go | 6 +- .../controller/services/repo_config_test.go | 30 ++------ .../internal/controller/services/services.go | 7 ++ .../internal/controller/services/util.go | 76 ++++++++++--------- 5 files changed, 55 insertions(+), 78 deletions(-) diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 3d946aa9483..a8448571c04 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "fmt" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -87,21 +85,11 @@ type OfflineStoreFilePersistence struct { Type string `json:"type,omitempty"` } -var validOfflineStoreFilePersistenceType = []string{ +var ValidOfflineStoreFilePersistenceTypes = []string{ "dask", "duckdb", } -// A function to validate the file persistence types for offline stores -func IsValidOfflineStoreFilePersistenceType(value string) (bool, error) { - for _, v := range validOfflineStoreFilePersistenceType { - if v == value { - return true, nil - } - } - return false, fmt.Errorf("invalid file type %s for offline store", value) -} - // OnlineStore configures the deployed online store service type OnlineStore struct { ServiceConfigs `json:",inline"` diff --git a/infra/feast-operator/internal/controller/featurestore_controller.go b/infra/feast-operator/internal/controller/featurestore_controller.go index 6605f2d1fca..5da9bad3aa6 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller.go +++ b/infra/feast-operator/internal/controller/featurestore_controller.go @@ -77,11 +77,7 @@ func (r *FeatureStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request currentStatus := cr.Status.DeepCopy() // initial status defaults must occur before feast deployment - if err := services.ApplyDefaultsToStatus(cr); err != nil { - logger.Error(err, "Error updating the FeatureStore status") - result = ctrl.Result{Requeue: true, RequeueAfter: RequeueDelayError} - return result, err - } + services.ApplyDefaultsToStatus(cr) result, recErr = r.deployFeast(ctx, cr) if cr.DeletionTimestamp == nil && !reflect.DeepEqual(currentStatus, cr.Status) { if err = r.Client.Status().Update(ctx, cr); err != nil { diff --git a/infra/feast-operator/internal/controller/services/repo_config_test.go b/infra/feast-operator/internal/controller/services/repo_config_test.go index fc2202333f5..285fc05c566 100644 --- a/infra/feast-operator/internal/controller/services/repo_config_test.go +++ b/infra/feast-operator/internal/controller/services/repo_config_test.go @@ -31,10 +31,9 @@ var _ = Describe("Repo Config", func() { It("should successfully create the repo configs", func() { By("Having the minimal created resource") featureStore := minimalFeatureStore() - err := ApplyDefaultsToStatus(featureStore) - Expect(err).NotTo(HaveOccurred()) + ApplyDefaultsToStatus(featureStore) var repoConfig RepoConfig - repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + repoConfig, err := getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) @@ -69,8 +68,7 @@ var _ = Describe("Repo Config", func() { }, }, } - err = ApplyDefaultsToStatus(featureStore) - Expect(err).NotTo(HaveOccurred()) + ApplyDefaultsToStatus(featureStore) repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) @@ -105,8 +103,7 @@ var _ = Describe("Repo Config", func() { }, }, } - err = ApplyDefaultsToStatus(featureStore) - Expect(err).NotTo(HaveOccurred()) + ApplyDefaultsToStatus(featureStore) repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) @@ -152,8 +149,7 @@ var _ = Describe("Repo Config", func() { }, }, } - err = ApplyDefaultsToStatus(featureStore) - Expect(err).NotTo(HaveOccurred()) + ApplyDefaultsToStatus(featureStore) repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) Expect(err).NotTo(HaveOccurred()) expectedOfflineConfig := OfflineStoreConfig{ @@ -183,22 +179,6 @@ var _ = Describe("Repo Config", func() { } Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) }) - - It("should fail the creation of repo configs", func() { - By("Having wrong offline persistence type") - featureStore := minimalFeatureStore() - featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ - OfflineStore: &feastdevv1alpha1.OfflineStore{ - Persistence: &feastdevv1alpha1.OfflineStorePersistence{ - FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ - Type: "invalid", - }, - }, - }, - } - err := ApplyDefaultsToStatus(featureStore) - Expect(err).To(HaveOccurred()) - }) }) }) diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 7acdc564bd3..ecd669d843a 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -43,6 +43,13 @@ func (feast *FeastServices) Deploy() error { services := feast.FeatureStore.Status.Applied.Services if services != nil { if services.OfflineStore != nil { + if services.OfflineStore.Persistence != nil && + services.OfflineStore.Persistence.FilePersistence != nil && + len(services.OfflineStore.Persistence.FilePersistence.Type) > 0 { + if _, err := isValidOfflineStoreFilePersistenceType(services.OfflineStore.Persistence.FilePersistence.Type); err != nil { + return err + } + } if err := feast.deployFeastServiceByType(OfflineFeastType); err != nil { return err } diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index af0e51c742c..906ab9177e9 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -1,6 +1,8 @@ package services import ( + "fmt" + "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" ) @@ -10,66 +12,61 @@ func IsLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil } -func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) error { +func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { cr.Status.FeastVersion = feastversion.FeastVersion applied := cr.Spec.DeepCopy() if applied.Services == nil { applied.Services = &feastdevv1alpha1.FeatureStoreServices{} } + services := applied.Services // default to registry service deployment - if applied.Services.Registry == nil { - applied.Services.Registry = &feastdevv1alpha1.Registry{} + if services.Registry == nil { + services.Registry = &feastdevv1alpha1.Registry{} } // if remote registry not set, proceed w/ local registry defaults - if applied.Services.Registry.Remote == nil { + if services.Registry.Remote == nil { // if local registry not set, apply an empty pointer struct - if applied.Services.Registry.Local == nil { - applied.Services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{} + if services.Registry.Local == nil { + services.Registry.Local = &feastdevv1alpha1.LocalRegistryConfig{} } - if applied.Services.Registry.Local.Persistence == nil { - applied.Services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{} + if services.Registry.Local.Persistence == nil { + services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{} } - if applied.Services.Registry.Local.Persistence.FilePersistence == nil { - applied.Services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{} + if services.Registry.Local.Persistence.FilePersistence == nil { + services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{} } - if len(applied.Services.Registry.Local.Persistence.FilePersistence.Path) == 0 { - applied.Services.Registry.Local.Persistence.FilePersistence.Path = DefaultRegistryPath + if len(services.Registry.Local.Persistence.FilePersistence.Path) == 0 { + services.Registry.Local.Persistence.FilePersistence.Path = DefaultRegistryPath } - setServiceDefaultConfigs(&applied.Services.Registry.Local.ServiceConfigs.DefaultConfigs) + setServiceDefaultConfigs(&services.Registry.Local.ServiceConfigs.DefaultConfigs) } - if applied.Services.OfflineStore != nil { - setServiceDefaultConfigs(&applied.Services.OfflineStore.ServiceConfigs.DefaultConfigs) - if applied.Services.OfflineStore.Persistence == nil { - applied.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{} + if services.OfflineStore != nil { + setServiceDefaultConfigs(&services.OfflineStore.ServiceConfigs.DefaultConfigs) + if services.OfflineStore.Persistence == nil { + services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{} } - if applied.Services.OfflineStore.Persistence.FilePersistence == nil { - applied.Services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{} + if services.OfflineStore.Persistence.FilePersistence == nil { + services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{} } - if len(applied.Services.OfflineStore.Persistence.FilePersistence.Type) == 0 { - applied.Services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineDaskConfigType) - } else { - _, err := feastdevv1alpha1.IsValidOfflineStoreFilePersistenceType(applied.Services.OfflineStore.Persistence.FilePersistence.Type) - if err != nil { - return err - } + if len(services.OfflineStore.Persistence.FilePersistence.Type) == 0 { + services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineDaskConfigType) } } - if applied.Services.OnlineStore != nil { - setServiceDefaultConfigs(&applied.Services.OnlineStore.ServiceConfigs.DefaultConfigs) - if applied.Services.OnlineStore.Persistence == nil { - applied.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{} + if services.OnlineStore != nil { + setServiceDefaultConfigs(&services.OnlineStore.ServiceConfigs.DefaultConfigs) + if services.OnlineStore.Persistence == nil { + services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{} } - if applied.Services.OnlineStore.Persistence.FilePersistence == nil { - applied.Services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{} + if services.OnlineStore.Persistence.FilePersistence == nil { + services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{} } - if len(applied.Services.OnlineStore.Persistence.FilePersistence.Path) == 0 { - applied.Services.OnlineStore.Persistence.FilePersistence.Path = DefaultOnlinePath + if len(services.OnlineStore.Persistence.FilePersistence.Path) == 0 { + services.OnlineStore.Persistence.FilePersistence.Path = DefaultOnlinePath } } // overwrite status.applied with every reconcile applied.DeepCopyInto(&cr.Status.Applied) - return nil } func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { @@ -77,3 +74,12 @@ func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { defaultConfigs.Image = &DefaultImage } } + +func isValidOfflineStoreFilePersistenceType(value string) (bool, error) { + for _, v := range feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes { + if v == value { + return true, nil + } + } + return false, fmt.Errorf("invalid file type %s for offline store", value) +} From ca09666e7b0ec5c3400d764bb196fae9baad18d4 Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Thu, 7 Nov 2024 15:37:15 +0100 Subject: [PATCH 09/11] renamed isValidOfflineStoreFilePersistenceType to checkOfflineStoreFilePersistenceType Signed-off-by: Daniele Martinoli --- .../feast-operator/internal/controller/services/services.go | 2 +- infra/feast-operator/internal/controller/services/util.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index ecd669d843a..dc5bfa0c259 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -46,7 +46,7 @@ func (feast *FeastServices) Deploy() error { if services.OfflineStore.Persistence != nil && services.OfflineStore.Persistence.FilePersistence != nil && len(services.OfflineStore.Persistence.FilePersistence.Type) > 0 { - if _, err := isValidOfflineStoreFilePersistenceType(services.OfflineStore.Persistence.FilePersistence.Type); err != nil { + if err := checkOfflineStoreFilePersistenceType(services.OfflineStore.Persistence.FilePersistence.Type); err != nil { return err } } diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 906ab9177e9..da6307936f2 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -75,11 +75,11 @@ func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { } } -func isValidOfflineStoreFilePersistenceType(value string) (bool, error) { +func checkOfflineStoreFilePersistenceType(value string) error { for _, v := range feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes { if v == value { - return true, nil + return nil } } - return false, fmt.Errorf("invalid file type %s for offline store", value) + return fmt.Errorf("invalid file type %s for offline store", value) } From f1134ac5fabee9711aa6de686aeefbe6dc82cbe2 Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Thu, 7 Nov 2024 17:33:06 +0100 Subject: [PATCH 10/11] adding controller tests for ephemeral stores Signed-off-by: Daniele Martinoli --- .../featurestore_controller_test.go | 449 ++++++++++++++++++ .../controller/services/repo_config.go | 2 +- .../internal/controller/services/services.go | 2 +- .../internal/controller/services/util.go | 2 +- 4 files changed, 452 insertions(+), 3 deletions(-) diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index d6ecb0fa577..aab260d9991 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -19,6 +19,7 @@ package controller import ( "context" "encoding/base64" + "fmt" "reflect" . "github.com/onsi/ginkgo/v2" @@ -441,16 +442,25 @@ var _ = Describe("FeatureStore Controller", func() { Expect(resource.Status.Applied.FeastProject).To(Equal(resource.Spec.FeastProject)) Expect(resource.Status.Applied.Services).NotTo(BeNil()) Expect(resource.Status.Applied.Services.OfflineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence.Type).To(Equal("dask")) Expect(resource.Status.Applied.Services.OfflineStore.ImagePullPolicy).To(BeNil()) Expect(resource.Status.Applied.Services.OfflineStore.Resources).To(BeNil()) Expect(resource.Status.Applied.Services.OfflineStore.Image).To(Equal(&services.DefaultImage)) Expect(resource.Status.Applied.Services.OnlineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence.Path).To(Equal(services.DefaultOnlinePath)) Expect(resource.Status.Applied.Services.OnlineStore.Env).To(Equal(&[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}})) Expect(resource.Status.Applied.Services.OnlineStore.ImagePullPolicy).To(Equal(&pullPolicy)) Expect(resource.Status.Applied.Services.OnlineStore.Resources).NotTo(BeNil()) Expect(resource.Status.Applied.Services.OnlineStore.Image).To(Equal(&image)) Expect(resource.Status.Applied.Services.Registry).NotTo(BeNil()) Expect(resource.Status.Applied.Services.Registry.Local).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.Path).To(Equal(services.DefaultRegistryPath)) Expect(resource.Status.Applied.Services.Registry.Local.ImagePullPolicy).To(BeNil()) Expect(resource.Status.Applied.Services.Registry.Local.Resources).To(BeNil()) Expect(resource.Status.Applied.Services.Registry.Local.Image).To(Equal(&services.DefaultImage)) @@ -1187,6 +1197,445 @@ var _ = Describe("FeatureStore Controller", func() { Expect(err).NotTo(HaveOccurred()) }) }) + + Context("When deploying a resource with all ephemeral services", func() { + const resourceName = "services-ephemeral" + image := "test:latest" + var pullPolicy = corev1.PullAlways + var testEnvVarName = "testEnvVarName" + var testEnvVarValue = "testEnvVarValue" + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + featurestore := &feastdevv1alpha1.FeatureStore{} + onlineStorePath := "/data/online.db" + registryPath := "/data/registry.db" + offlineType := "duckdb" + + BeforeEach(func() { + By("creating the custom resource for the Kind FeatureStore") + err := k8sClient.Get(ctx, typeNamespacedName, featurestore) + if err != nil && errors.IsNotFound(err) { + resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) + resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ + Type: offlineType, + }, + } + resource.Spec.Services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: onlineStorePath, + }, + } + resource.Spec.Services.Registry = &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: registryPath, + }, + }, + }, + } + + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + } + }) + AfterEach(func() { + resource := &feastdevv1alpha1.FeatureStore{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + By("Cleanup the specific resource instance FeatureStore") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + + It("should successfully reconcile the resource", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + feast := services.FeastServices{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + } + Expect(resource.Status).NotTo(BeNil()) + Expect(resource.Status.FeastVersion).To(Equal(feastversion.FeastVersion)) + Expect(resource.Status.ClientConfigMap).To(Equal(feast.GetFeastServiceName(services.ClientFeastType))) + Expect(resource.Status.Applied.FeastProject).To(Equal(resource.Spec.FeastProject)) + Expect(resource.Status.Applied.Services).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Persistence.FilePersistence.Type).To(Equal(offlineType)) + Expect(resource.Status.Applied.Services.OfflineStore.ImagePullPolicy).To(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Resources).To(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore.Image).To(Equal(&services.DefaultImage)) + Expect(resource.Status.Applied.Services.OnlineStore).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Persistence.FilePersistence.Path).To(Equal(onlineStorePath)) + Expect(resource.Status.Applied.Services.OnlineStore.Env).To(Equal(&[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}})) + Expect(resource.Status.Applied.Services.OnlineStore.ImagePullPolicy).To(Equal(&pullPolicy)) + Expect(resource.Status.Applied.Services.OnlineStore.Resources).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore.Image).To(Equal(&image)) + Expect(resource.Status.Applied.Services.Registry).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.Path).To(Equal(registryPath)) + Expect(resource.Status.Applied.Services.Registry.Local.ImagePullPolicy).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Resources).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Image).To(Equal(&services.DefaultImage)) + + domain := ".svc.cluster.local:80" + Expect(resource.Status.ServiceHostnames.OfflineStore).To(Equal(feast.GetFeastServiceName(services.OfflineFeastType) + "." + resource.Namespace + domain)) + Expect(resource.Status.ServiceHostnames.OnlineStore).To(Equal(feast.GetFeastServiceName(services.OnlineFeastType) + "." + resource.Namespace + domain)) + Expect(resource.Status.ServiceHostnames.Registry).To(Equal(feast.GetFeastServiceName(services.RegistryFeastType) + "." + resource.Namespace + domain)) + + Expect(resource.Status.Conditions).NotTo(BeEmpty()) + cond := apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.ReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.ReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.ReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.RegistryReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.RegistryReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.RegistryReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.ClientReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.ClientReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.ClientReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.OfflineStoreReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.OfflineStoreReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.OfflineStoreReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.OnlineStoreReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.OnlineStoreReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.OnlineStoreReadyMessage)) + + Expect(resource.Status.Phase).To(Equal(feastdevv1alpha1.ReadyPhase)) + + deploy := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Replicas).To(Equal(&services.DefaultReplicas)) + Expect(controllerutil.HasControllerReference(deploy)).To(BeTrue()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + + svc := &corev1.Service{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + svc) + Expect(err).NotTo(HaveOccurred()) + Expect(controllerutil.HasControllerReference(svc)).To(BeTrue()) + Expect(svc.Spec.Ports[0].TargetPort).To(Equal(intstr.FromInt(int(services.FeastServiceConstants[services.RegistryFeastType].TargetPort)))) + }) + + It("should properly encode a feature_store.yaml config", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + req, err := labels.NewRequirement(services.NameLabelKey, selection.Equals, []string{resource.Name}) + Expect(err).NotTo(HaveOccurred()) + labelSelector := labels.NewSelector().Add(*req) + listOpts := &client.ListOptions{Namespace: resource.Namespace, LabelSelector: labelSelector} + deployList := appsv1.DeploymentList{} + err = k8sClient.List(ctx, &deployList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(deployList.Items).To(HaveLen(3)) + + svcList := corev1.ServiceList{} + err = k8sClient.List(ctx, &svcList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(svcList.Items).To(HaveLen(3)) + + cmList := corev1.ConfigMapList{} + err = k8sClient.List(ctx, &cmList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(cmList.Items).To(HaveLen(1)) + + feast := services.FeastServices{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + } + + // check registry config + deploy := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + env := getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err := feast.GetServiceFeatureStoreYamlBase64(services.RegistryFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err := base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + testConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + Registry: services.RegistryConfig{ + RegistryType: services.RegistryFileConfigType, + Path: registryPath, + }, + } + Expect(repoConfig).To(Equal(testConfig)) + + // check offline config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OfflineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfigOffline := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOffline) + Expect(err).NotTo(HaveOccurred()) + regRemote := services.RegistryConfig{ + RegistryType: services.RegistryRemoteConfigType, + Path: fmt.Sprintf("feast-%s-registry.default.svc.cluster.local:80", resourceName), + } + offlineConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + OfflineStore: services.OfflineStoreConfig{ + Type: services.OfflineDuckDbConfigType, + }, + Registry: regRemote, + } + Expect(repoConfigOffline).To(Equal(offlineConfig)) + + // check online config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(3)) + Expect(deploy.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullAlways)) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OnlineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfigOnline := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOnline) + Expect(err).NotTo(HaveOccurred()) + offlineRemote := services.OfflineStoreConfig{ + Host: fmt.Sprintf("feast-%s-offline.default.svc.cluster.local", resourceName), + Type: services.OfflineRemoteConfigType, + Port: services.HttpPort, + } + onlineConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + OfflineStore: offlineRemote, + OnlineStore: services.OnlineStoreConfig{ + Path: onlineStorePath, + Type: services.OnlineSqliteConfigType, + }, + Registry: regRemote, + } + Expect(repoConfigOnline).To(Equal(onlineConfig)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(3)) + + // check client config + cm := &corev1.ConfigMap{} + name := feast.GetFeastServiceName(services.ClientFeastType) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: name, + Namespace: resource.Namespace, + }, + cm) + Expect(err).NotTo(HaveOccurred()) + repoConfigClient := &services.RepoConfig{} + err = yaml.Unmarshal([]byte(cm.Data[services.FeatureStoreYamlCmKey]), repoConfigClient) + Expect(err).NotTo(HaveOccurred()) + clientConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + OfflineStore: offlineRemote, + OnlineStore: services.OnlineStoreConfig{ + Path: fmt.Sprintf("http://feast-%s-online.default.svc.cluster.local:80", resourceName), + Type: services.OnlineRemoteConfigType, + }, + Registry: regRemote, + } + Expect(repoConfigClient).To(Equal(clientConfig)) + + // change paths and reconcile + resourceNew := resource.DeepCopy() + newOnlineStorePath := "/data/new_online.db" + newRegistryPath := "/data/new_registry.db" + resourceNew.Spec.Services.OnlineStore.Persistence.FilePersistence.Path = newOnlineStorePath + resourceNew.Spec.Services.Registry.Local.Persistence.FilePersistence.Path = newRegistryPath + err = k8sClient.Update(ctx, resourceNew) + Expect(err).NotTo(HaveOccurred()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource = &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + feast.FeatureStore = resource + + // check registry config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.RegistryFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + testConfig.Registry.Path = newRegistryPath + Expect(repoConfig).To(Equal(testConfig)) + + // check offline config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OfflineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfigOffline = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOffline) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfigOffline).To(Equal(offlineConfig)) + + // check online config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OnlineFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + + repoConfigOnline = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfigOnline) + Expect(err).NotTo(HaveOccurred()) + onlineConfig.OnlineStore.Path = newOnlineStorePath + Expect(repoConfigOnline).To(Equal(onlineConfig)) + }) + }) }) func createFeatureStoreResource(resourceName string, image string, pullPolicy corev1.PullPolicy, envVars *[]corev1.EnvVar) *feastdevv1alpha1.FeatureStore { diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index a4e94364d2d..e087038ba52 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -50,7 +50,7 @@ func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1al appliedSpec := featureStore.Status.Applied repoConfig := getClientRepoConfig(featureStore) - isLocalRegistry := IsLocalRegistry(featureStore) + isLocalRegistry := isLocalRegistry(featureStore) if appliedSpec.Services != nil { // Offline server has an `offline_store` section and a remote `registry` if feastType == OfflineFeastType && appliedSpec.Services.OfflineStore != nil { diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index dc5bfa0c259..a205c838ea1 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -336,7 +336,7 @@ func (feast *FeastServices) setRemoteRegistryURL() error { } func (feast *FeastServices) isLocalRegistry() bool { - return IsLocalRegistry(feast.FeatureStore) + return isLocalRegistry(feast.FeatureStore) } func (feast *FeastServices) isRemoteRegistry() bool { diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index da6307936f2..c6e4f31b36a 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -7,7 +7,7 @@ import ( feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" ) -func IsLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { +func isLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { appliedServices := featureStore.Status.Applied.Services return appliedServices != nil && appliedServices.Registry != nil && appliedServices.Registry.Local != nil } From 8677162a9bb59cd5dc21528709971fe0ad380c2f Mon Sep 17 00:00:00 2001 From: Daniele Martinoli Date: Thu, 7 Nov 2024 17:39:39 +0100 Subject: [PATCH 11/11] using slices package Signed-off-by: Daniele Martinoli --- infra/feast-operator/internal/controller/services/util.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index c6e4f31b36a..c2b29361a84 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -2,6 +2,7 @@ package services import ( "fmt" + "slices" "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" @@ -76,10 +77,8 @@ func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { } func checkOfflineStoreFilePersistenceType(value string) error { - for _, v := range feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes { - if v == value { - return nil - } + if slices.Contains(feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes, value) { + return nil } return fmt.Errorf("invalid file type %s for offline store", value) }