From 5e0f9b188e2ff0b312a9a77cb07b792f8ddc6a82 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Tue, 16 Apr 2024 16:06:52 -0400 Subject: [PATCH] feat(backend): add namespace & prefix scoped credentials to kfp-launcher config for object store paths (#10625) * add bucket session info to pipeline context Signed-off-by: Humair Khan * allow driver to read bucket session info Instead of only reading the kfp-launcher when a custom pipeline root is specified, the root dag will now always read the kfp-launcher config to search for a matching bucket if such a configuration is provided in kfp-launcher Signed-off-by: Humair Khan * add support for bucket prefix matching Provides a structured configuration for bucket providers, whereby user can specify credentials for different providers and path prefixes. A new interface for providing sessions is introduced, which should be implemented for any new provider configuration support. Signed-off-by: Humair Khan * allow object store to handle different providers Utilizes blob provider specific constructors to open s3, minio, gcs accordingly. If a sessioninfo is provided (via kfp-launcher config) then the associated secret is fetched for each case to gain credentials. If fromEnv is provided, then the standard url opener is used. Also separates out config fields and operations to a separate file. Signed-off-by: Humair Khan * utilize session info in launcher & importer retrieves the session info (if provided via kfp-launcher) and utilizes it for opening the provider's associated bucket Signed-off-by: Humair Khan * skip config for default aws s3 endpoint Signed-off-by: Humair Khan * chore: refactor/clarify store session info naming also added some additional code comments clarifying store cred variable usage Signed-off-by: Humair Khan * chore: handle query parameters as s3 as well as update validation logic for provider config, and fix tests accordingly. Signed-off-by: Humair Khan --------- Signed-off-by: Humair Khan --- backend/src/v2/component/importer_launcher.go | 2 +- backend/src/v2/component/launcher_v2.go | 27 +- backend/src/v2/component/launcher_v2_test.go | 2 +- backend/src/v2/config/env.go | 97 +++- backend/src/v2/config/env_test.go | 530 ++++++++++++++++++ backend/src/v2/config/gcs.go | 123 ++++ backend/src/v2/config/minio.go | 51 ++ backend/src/v2/config/s3.go | 156 ++++++ .../v2/config/testdata/provider_cases.yaml | 268 +++++++++ backend/src/v2/driver/driver.go | 51 +- backend/src/v2/metadata/client.go | 20 +- backend/src/v2/metadata/client_fake.go | 2 +- backend/src/v2/metadata/client_test.go | 14 +- backend/src/v2/objectstore/config.go | 233 ++++++++ backend/src/v2/objectstore/object_store.go | 270 ++++----- .../src/v2/objectstore/object_store_test.go | 192 ++++++- go.mod | 2 +- 17 files changed, 1831 insertions(+), 209 deletions(-) create mode 100644 backend/src/v2/config/env_test.go create mode 100644 backend/src/v2/config/gcs.go create mode 100644 backend/src/v2/config/minio.go create mode 100644 backend/src/v2/config/s3.go create mode 100644 backend/src/v2/config/testdata/provider_cases.yaml create mode 100644 backend/src/v2/objectstore/config.go diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go index 25ebc390926..e6dae29d639 100644 --- a/backend/src/v2/component/importer_launcher.go +++ b/backend/src/v2/component/importer_launcher.go @@ -111,7 +111,7 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) { }() // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "") + pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "", "") if err != nil { return err } diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index bf1bf1604d3..411055daca0 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -156,7 +156,12 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) { return err } fingerPrint := execution.FingerPrint() - bucketConfig, err := objectstore.ParseBucketConfig(execution.GetPipeline().GetPipelineRoot()) + storeSessionInfo, err := objectstore.GetSessionInfoFromString(execution.GetPipeline().GetStoreSessionInfo()) + if err != nil { + return err + } + pipelineRoot := execution.GetPipeline().GetPipelineRoot() + bucketConfig, err := objectstore.ParseBucketConfig(pipelineRoot, storeSessionInfo) if err != nil { return err } @@ -534,14 +539,22 @@ func fetchNonDefaultBuckets( } // TODO: Support multiple artifacts someday, probably through the v2 engine. artifact := artifactList.Artifacts[0] + // The artifact does not belong under the object store path for this run. Cases: + // 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath + // 2. Artifact is imported from the same bucket, but from a different path (re-use the same session) + // 3. Artifact is imported from a different bucket, or obj store (default to using user env in this case) if !strings.HasPrefix(artifact.Uri, defaultBucketConfig.PrefixedBucket()) { - nonDefaultBucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri) - if err != nil { - return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), err) + nonDefaultBucketConfig, parseErr := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri) + if parseErr != nil { + return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), parseErr) } - nonDefaultBucket, err := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig) - if err != nil { - return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), err) + // check if it's same bucket but under a different path, re-use the default bucket session in this case. + if (nonDefaultBucketConfig.Scheme == defaultBucketConfig.Scheme) && (nonDefaultBucketConfig.BucketName == defaultBucketConfig.BucketName) { + nonDefaultBucketConfig.SessionInfo = defaultBucketConfig.SessionInfo + } + nonDefaultBucket, bucketErr := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig) + if bucketErr != nil { + return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), bucketErr) } nonDefaultBuckets[nonDefaultBucketConfig.PrefixedBucket()] = nonDefaultBucket } diff --git a/backend/src/v2/component/launcher_v2_test.go b/backend/src/v2/component/launcher_v2_test.go index 55e97e16406..2353c3e4f24 100644 --- a/backend/src/v2/component/launcher_v2_test.go +++ b/backend/src/v2/component/launcher_v2_test.go @@ -78,7 +78,7 @@ func Test_executeV2_Parameters(t *testing.T) { fakeMetadataClient := metadata.NewFakeClient() bucket, err := blob.OpenBucket(context.Background(), "mem://test-bucket") assert.Nil(t, err) - bucketConfig, err := objectstore.ParseBucketConfig("mem://test-bucket/pipeline-root/") + bucketConfig, err := objectstore.ParseBucketConfig("mem://test-bucket/pipeline-root/", nil) assert.Nil(t, err) _, _, err = executeV2(context.Background(), test.executorInput, addNumbersComponent, "sh", test.executorArgs, bucket, bucketConfig, fakeMetadataClient, "namespace", fakeKubernetesClientset) diff --git a/backend/src/v2/config/env.go b/backend/src/v2/config/env.go index 3eefcd382e3..aa20b5a3916 100644 --- a/backend/src/v2/config/env.go +++ b/backend/src/v2/config/env.go @@ -19,7 +19,10 @@ package config import ( "context" "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "io/ioutil" + "sigs.k8s.io/yaml" + "strconv" "strings" "github.com/golang/glog" @@ -32,8 +35,23 @@ const ( configMapName = "kfp-launcher" defaultPipelineRoot = "minio://mlpipeline/v2/artifacts" configKeyDefaultPipelineRoot = "defaultPipelineRoot" + configBucketProviders = "providers" + minioArtifactSecretName = "mlpipeline-minio-artifact" + // The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey" + minioArtifactSecretKeyKey = "secretkey" + minioArtifactAccessKeyKey = "accesskey" ) +type BucketProviders struct { + Minio *MinioProviderConfig `json:"minio"` + S3 *S3ProviderConfig `json:"s3"` + GCS *GCSProviderConfig `json:"gs"` +} + +type SessionInfoProvider interface { + ProvideSessionInfo(path string) (objectstore.SessionInfo, error) +} + // Config is the KFP runtime configuration. type Config struct { data map[string]string @@ -53,7 +71,7 @@ func FromConfigMap(ctx context.Context, clientSet kubernetes.Interface, namespac return &Config{data: config.Data}, nil } -// Config.DefaultPipelineRoot gets the configured default pipeline root. +// DefaultPipelineRoot gets the configured default pipeline root. func (c *Config) DefaultPipelineRoot() string { // The key defaultPipelineRoot is optional in launcher config. if c == nil || c.data[configKeyDefaultPipelineRoot] == "" { @@ -82,3 +100,80 @@ func InPodName() (string, error) { name := string(podName) return strings.TrimSuffix(name, "\n"), nil } + +func (c *Config) GetStoreSessionInfo(path string) (objectstore.SessionInfo, error) { + bucketConfig, err := objectstore.ParseBucketPathToConfig(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + provider := strings.TrimSuffix(bucketConfig.Scheme, "://") + bucketProviders, err := c.getBucketProviders() + if err != nil { + return objectstore.SessionInfo{}, err + } + + var sessProvider SessionInfoProvider + + switch provider { + case "minio": + if bucketProviders == nil || bucketProviders.Minio == nil { + sessProvider = &MinioProviderConfig{} + } else { + sessProvider = bucketProviders.Minio + } + break + case "s3": + if bucketProviders == nil || bucketProviders.S3 == nil { + sessProvider = &S3ProviderConfig{} + } else { + sessProvider = bucketProviders.S3 + } + break + case "gs": + if bucketProviders == nil || bucketProviders.GCS == nil { + sessProvider = &GCSProviderConfig{} + } else { + sessProvider = bucketProviders.GCS + } + break + default: + return objectstore.SessionInfo{}, fmt.Errorf("Encountered unsupported provider in provider config %s", provider) + } + + sess, err := sessProvider.ProvideSessionInfo(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + return sess, nil +} + +// getBucketProviders gets the provider configuration +func (c *Config) getBucketProviders() (*BucketProviders, error) { + if c == nil || c.data[configBucketProviders] == "" { + return nil, nil + } + bucketProviders := &BucketProviders{} + configAuth := c.data[configBucketProviders] + err := yaml.Unmarshal([]byte(configAuth), bucketProviders) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall kfp bucket providers, ensure that providers config is well formed: %w", err) + } + return bucketProviders, nil +} + +func getDefaultMinioSessionInfo() (objectstore.SessionInfo, error) { + sess := objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": objectstore.MinioDefaultEndpoint(), + "disableSSL": strconv.FormatBool(true), + "fromEnv": strconv.FormatBool(false), + "secretName": minioArtifactSecretName, + // The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey" + "accessKeyKey": minioArtifactAccessKeyKey, + "secretKeyKey": minioArtifactSecretKeyKey, + }, + } + return sess, nil +} diff --git a/backend/src/v2/config/env_test.go b/backend/src/v2/config/env_test.go new file mode 100644 index 00000000000..8f120126394 --- /dev/null +++ b/backend/src/v2/config/env_test.go @@ -0,0 +1,530 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" + "github.com/stretchr/testify/assert" + "os" + "sigs.k8s.io/yaml" + "testing" +) + +type TestcaseData struct { + Testcases []ProviderCase `json:"cases"` +} +type ProviderCase struct { + Name string `json:"name"` + Value string `json:"value"` +} + +func Test_getDefaultMinioSessionInfo(t *testing.T) { + actualDefaultSession, err := getDefaultMinioSessionInfo() + assert.Nil(t, err) + expectedDefaultSession := objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": "minio-service.kubeflow:9000", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "mlpipeline-minio-artifact", + "accessKeyKey": "accesskey", + "secretKeyKey": "secretkey", + }, + } + assert.Equal(t, expectedDefaultSession, actualDefaultSession) +} + +func TestGetBucketSessionInfo(t *testing.T) { + + providersDataFile, err := os.ReadFile("testdata/provider_cases.yaml") + if os.IsNotExist(err) { + panic(err) + } + + var providersData TestcaseData + err = yaml.Unmarshal(providersDataFile, &providersData) + if err != nil { + panic(err) + } + + tt := []struct { + msg string + config Config + expectedSessionInfo objectstore.SessionInfo + pipelineroot string + shouldError bool + errorMsg string + testDataCase string + }{ + { + msg: "invalid - unsupported object store protocol", + pipelineroot: "unsupported://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "unsupported Cloud bucket", + }, + { + msg: "valid - only s3 pipelineroot no provider config", + pipelineroot: "s3://my-bucket", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + }, + { + msg: "invalid - unsupported pipeline root format", + pipelineroot: "minio.unsupported.format", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "unrecognized pipeline root format", + }, + { + msg: "valid - no providers, should use minio default", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": "minio-service.kubeflow:9000", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "mlpipeline-minio-artifact", + "accessKeyKey": "accesskey", + "secretKeyKey": "secretkey", + }, + }, + }, + { + msg: "valid - no s3 provider match providers config", + pipelineroot: "s3://my-bucket", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + testDataCase: "case0", + }, + { + msg: "valid - no gcs provider match providers config", + pipelineroot: "gs://my-bucket", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "gs", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + testDataCase: "case0", + }, + { + msg: "valid - no minio provider match providers config, use default minio config", + pipelineroot: "minio://my-bucket", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": "minio-service.kubeflow:9000", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "mlpipeline-minio-artifact", + "accessKeyKey": "accesskey", + "secretKeyKey": "secretkey", + }, + }, + testDataCase: "case1", + }, + { + msg: "valid - empty minio provider, use default minio config", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": "minio-service.kubeflow:9000", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "mlpipeline-minio-artifact", + "accessKeyKey": "accesskey", + "secretKeyKey": "secretkey", + }, + }, + testDataCase: "case1", + }, + { + msg: "invalid - empty minio provider no override", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "invalid provider config", + testDataCase: "case2", + }, + { + msg: "invalid - minio provider endpoint only", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "invalid provider config", + testDataCase: "case3", + }, + { + msg: "invalid - one minio provider no creds", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "missing default credentials", + testDataCase: "case4", + }, + { + msg: "valid - minio provider with default only", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": "minio-endpoint-5.com", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "test-secret-5", + "accessKeyKey": "test-accessKeyKey-5", + "secretKeyKey": "test-secretKeyKey-5", + }, + }, + testDataCase: "case5", + }, + { + msg: "valid - pick minio provider", + pipelineroot: "minio://minio-bucket-a/some/minio/path/a", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio-a", + "endpoint": "minio-endpoint-6.com", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "minio-test-secret-6-a", + "accessKeyKey": "minio-test-accessKeyKey-6-a", + "secretKeyKey": "minio-test-secretKeyKey-6-a", + }, + }, + testDataCase: "case6", + }, + { + msg: "invalid - s3 should require default creds", + pipelineroot: "s3://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "missing default credentials", + testDataCase: "case7", + }, + { + msg: "invalid - gs should require default creds", + pipelineroot: "gs://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "missing default credentials", + testDataCase: "case7", + }, + { + msg: "invalid - minio should require default creds", + pipelineroot: "minio://my-bucket/v2/artifacts", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "missing default credentials", + testDataCase: "case7", + }, + { + msg: "invalid - matching prefix override should require secretref, if fromEnv is false", + pipelineroot: "minio://minio-bucket-a/some/minio/path/a", + expectedSessionInfo: objectstore.SessionInfo{}, + shouldError: true, + errorMsg: "missing override secretref", + testDataCase: "case8", + }, + { + msg: "valid - matching prefix override should use creds from env, if fromEnv is true", + pipelineroot: "minio://minio-bucket-a/some/minio/path/a", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio-a", + "endpoint": "minio-endpoint-9.com", + "disableSSL": "true", + "fromEnv": "true", + }, + }, + testDataCase: "case9", + }, + { + msg: "valid - matching prefix override should use secret creds, even if default uses FromEnv", + pipelineroot: "minio://minio-bucket-a/some/minio/path/a", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio-a", + "endpoint": "minio-endpoint-10.com", + "disableSSL": "true", + "fromEnv": "false", + "secretName": "minio-test-secret-10", + "accessKeyKey": "minio-test-accessKeyKey-10", + "secretKeyKey": "minio-test-secretKeyKey-10", + }, + }, + testDataCase: "case10", + }, + { + msg: "valid - secret ref is not required for default s3 when fromEnv is true", + pipelineroot: "minio://minio-bucket-a/some/minio/path/b", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "region": "minio", + "endpoint": "minio-endpoint-10.com", + "disableSSL": "true", + "fromEnv": "true", + }, + }, + testDataCase: "case10", + }, + { + msg: "valid - match s3 default config when no override match exists", + pipelineroot: "s3://s3-bucket/no/override/path", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-1", + "endpoint": "s3.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "s3-testsecret-6", + "accessKeyKey": "s3-testaccessKeyKey-6", + "secretKeyKey": "s3-testsecretKeyKey-6", + }, + }, + testDataCase: "case6", + }, + { + msg: "valid - override should match first subpath prefix in pipelineroot", + pipelineroot: "s3://s3-bucket/some/s3/path/b", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-2", + "endpoint": "s3.us-east-2.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "s3-test-secret-6-b", + "accessKeyKey": "s3-test-accessKeyKey-6-b", + "secretKeyKey": "s3-test-secretKeyKey-6-b", + }, + }, + testDataCase: "case6", + }, + { + msg: "valid - test order, match first subpath prefix in pipelineroot, ignoring deeper path prefix further in list", + pipelineroot: "s3://s3-bucket/some/s3/path/a/b", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-1", + "endpoint": "s3.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "s3-test-secret-6-a", + "accessKeyKey": "s3-test-accessKeyKey-6-a", + "secretKeyKey": "s3-test-secretKeyKey-6-a", + }, + }, + testDataCase: "case6", + }, + { + msg: "valid - first matching gs override", + pipelineroot: "gs://gs-bucket-a/some/gs/path/1", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "gs", + Params: map[string]string{ + "fromEnv": "false", + "secretName": "gs-test-secret-6-a", + "tokenKey": "gs-test-tokenKey-6-a", + }, + }, + testDataCase: "case6", + }, + { + msg: "valid - pick default gs when no matching prefix", + pipelineroot: "gs://path/does/not/exist/so/use/default", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "gs", + Params: map[string]string{ + "fromEnv": "false", + "secretName": "gs-test-secret-6", + "tokenKey": "gs-test-tokenKey-6", + }, + }, + testDataCase: "case6", + }, + { + msg: "valid - gs secretref not required when default is set to env", + pipelineroot: "gs://path/does/not/exist/so/use/default", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "gs", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + testDataCase: "case11", + }, + { + msg: "valid - gs secretref not required when matching override is set to env", + pipelineroot: "gs://gs-bucket/some/gs/path/1/2", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "gs", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + testDataCase: "case11", + }, + { + msg: "valid - gs secretref is required when matching override is fromEnv:false", + pipelineroot: "gs://gs-bucket/some/gs/path/1", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "gs", + Params: map[string]string{ + "fromEnv": "false", + "secretName": "gs-test-secret-11", + "tokenKey": "gs-test-tokenKey-11", + }, + }, + testDataCase: "case11", + }, + } + + for _, test := range tt { + t.Run(test.msg, func(t *testing.T) { + config := Config{data: map[string]string{}} + if test.testDataCase != "" { + config.data["providers"] = fetchProviderFromData(providersData, test.testDataCase) + if config.data["providers"] == "" { + panic(fmt.Errorf("provider not found in testdata")) + } + } + + actualSession, err1 := config.GetStoreSessionInfo(test.pipelineroot) + if test.shouldError { + assert.Error(t, err1) + if err1 != nil && test.errorMsg != "" { + assert.Contains(t, err1.Error(), test.errorMsg) + } + } else { + assert.Nil(t, err1) + } + + assert.Equal(t, test.expectedSessionInfo, actualSession) + }) + } +} + +func Test_QueryParameters(t *testing.T) { + providersDataFile, err := os.ReadFile("testdata/provider_cases.yaml") + if os.IsNotExist(err) { + panic(err) + } + + var providersData TestcaseData + err = yaml.Unmarshal(providersDataFile, &providersData) + if err != nil { + panic(err) + } + + tt := []struct { + msg string + config Config + expectedSessionInfo objectstore.SessionInfo + pipelineroot string + shouldError bool + errorMsg string + testDataCase string + }{ + { + msg: "valid - for s3 fetch fromEnv when when query parameters are present, and when no matching provider config is provided", + pipelineroot: "s3://bucket_name/v2/artifacts/profile_name?region=bucket_region&endpoint=endpoint&disableSSL=not_use_ssl&s3ForcePathStyle=true", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + shouldError: false, + }, + { + msg: "valid - for minio fetch fromEnv when when query parameters are present, and when no matching provider config is provided", + pipelineroot: "minio://bucket_name/v2/artifacts/profile_name?region=bucket_region&endpoint=endpoint&disableSSL=not_use_ssl&s3ForcePathStyle=true", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + shouldError: false, + }, + { + msg: "valid - for minio fetch fromEnv when when query parameters are present, and when matching provider config is provided", + pipelineroot: "minio://bucket_name/v2/artifacts/profile_name?region=bucket_region&endpoint=endpoint&disableSSL=not_use_ssl&s3ForcePathStyle=true", + expectedSessionInfo: objectstore.SessionInfo{ + Provider: "minio", + Params: map[string]string{ + "fromEnv": "true", + }, + }, + shouldError: false, + testDataCase: "case12", + }, + } + for _, test := range tt { + t.Run(test.msg, func(t *testing.T) { + config := Config{data: map[string]string{}} + if test.testDataCase != "" { + config.data["providers"] = fetchProviderFromData(providersData, test.testDataCase) + if config.data["providers"] == "" { + panic(fmt.Errorf("provider not found in testdata")) + } + } + actualSession, err1 := config.GetStoreSessionInfo(test.pipelineroot) + if test.shouldError { + assert.Error(t, err1) + if err1 != nil && test.errorMsg != "" { + assert.Contains(t, err1.Error(), test.errorMsg) + } + } else { + assert.Nil(t, err1) + } + assert.Equal(t, test.expectedSessionInfo, actualSession) + }) + } +} + +func fetchProviderFromData(cases TestcaseData, name string) string { + for _, c := range cases.Testcases { + if c.Name == name { + return c.Value + } + } + return "" +} diff --git a/backend/src/v2/config/gcs.go b/backend/src/v2/config/gcs.go new file mode 100644 index 00000000000..76e7a4bfc82 --- /dev/null +++ b/backend/src/v2/config/gcs.go @@ -0,0 +1,123 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" + "strconv" + "strings" +) + +type GCSProviderConfig struct { + Default *GCSProviderDefault `json:"default"` + + // optional, ordered, the auth config for the first matching prefix is used + Overrides []GCSOverride `json:"Overrides"` +} + +type GCSProviderDefault struct { + // required + Credentials *GCSCredentials `json:"credentials"` +} + +type GCSOverride struct { + BucketName string `json:"bucketName"` + KeyPrefix string `json:"keyPrefix"` + Credentials *GCSCredentials `json:"credentials"` +} +type GCSCredentials struct { + // optional + FromEnv bool `json:"fromEnv"` + // if FromEnv is False then SecretRef is required + SecretRef *GCSSecretRef `json:"secretRef"` +} +type GCSSecretRef struct { + SecretName string `json:"secretName"` + TokenKey string `json:"tokenKey"` +} + +func (p GCSProviderConfig) ProvideSessionInfo(path string) (objectstore.SessionInfo, error) { + bucketConfig, err := objectstore.ParseBucketPathToConfig(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + bucketName := bucketConfig.BucketName + bucketPrefix := bucketConfig.Prefix + + invalidConfigErr := func(err error) error { + return fmt.Errorf("invalid provider config: %w", err) + } + + params := map[string]string{} + + // 1. If provider config did not have a matching configuration for the provider inferred from pipelineroot OR + // 2. If a user has provided query parameters + // then we use blob.OpenBucket(ctx, config.bucketURL()) by setting "FromEnv = True" + if p.Default == nil && p.Overrides == nil { + params["fromEnv"] = strconv.FormatBool(true) + return objectstore.SessionInfo{ + Provider: "gs", + Params: params, + }, nil + } + + if p.Default == nil || p.Default.Credentials == nil { + return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing default credentials")) + } + + params["fromEnv"] = strconv.FormatBool(p.Default.Credentials.FromEnv) + if !p.Default.Credentials.FromEnv { + params["secretName"] = p.Default.Credentials.SecretRef.SecretName + params["tokenKey"] = p.Default.Credentials.SecretRef.TokenKey + } + + // Set defaults + sessionInfo := objectstore.SessionInfo{ + Provider: "gs", + Params: params, + } + + // If there's a matching override, then override defaults with provided configs + override := p.getOverrideByPrefix(bucketName, bucketPrefix) + if override != nil { + if override.Credentials == nil { + return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing override secretref")) + } + params["fromEnv"] = strconv.FormatBool(override.Credentials.FromEnv) + if !override.Credentials.FromEnv { + if override.Credentials.SecretRef == nil { + return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing override secretref")) + } + params["secretName"] = override.Credentials.SecretRef.SecretName + params["tokenKey"] = override.Credentials.SecretRef.TokenKey + } else { + // Don't need a secret if pulling from Env + delete(params, "secretName") + delete(params, "tokenKey") + } + } + return sessionInfo, nil +} + +// getOverrideByPrefix returns first matching bucketname and prefix in overrides +func (p GCSProviderConfig) getOverrideByPrefix(bucketName, prefix string) *GCSOverride { + for _, override := range p.Overrides { + if override.BucketName == bucketName && strings.HasPrefix(prefix, override.KeyPrefix) { + return &override + } + } + return nil +} diff --git a/backend/src/v2/config/minio.go b/backend/src/v2/config/minio.go new file mode 100644 index 00000000000..ef394078d3e --- /dev/null +++ b/backend/src/v2/config/minio.go @@ -0,0 +1,51 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" +) + +type MinioProviderConfig S3ProviderConfig + +// ProvideSessionInfo provides the SessionInfo for minio provider. +// this is the same as s3ProviderConfig.ProvideSessionInfo except +// the provider is set to minio +func (p MinioProviderConfig) ProvideSessionInfo(path string) (objectstore.SessionInfo, error) { + bucketConfig, err := objectstore.ParseBucketPathToConfig(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + queryString := bucketConfig.QueryString + + // When using minio root, with no query strings, if no matching provider in kfp-launcher exists + // we use the default minio configurations + if (p.Default == nil && p.Overrides == nil) && queryString == "" { + sess, sessErr := getDefaultMinioSessionInfo() + if sessErr != nil { + return objectstore.SessionInfo{}, nil + } + return sess, nil + } + + s3ProviderConfig := S3ProviderConfig(p) + + info, err := s3ProviderConfig.ProvideSessionInfo(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + info.Provider = "minio" + return info, nil +} diff --git a/backend/src/v2/config/s3.go b/backend/src/v2/config/s3.go new file mode 100644 index 00000000000..8cfc86d8514 --- /dev/null +++ b/backend/src/v2/config/s3.go @@ -0,0 +1,156 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" + "strconv" + "strings" +) + +type S3ProviderConfig struct { + Default *S3ProviderDefault `json:"default"` + // optional, ordered, the auth config for the first matching prefix is used + Overrides []S3Override `json:"Overrides"` +} + +type S3ProviderDefault struct { + Endpoint string `json:"endpoint"` + Credentials *S3Credentials `json:"credentials"` + // optional for any non aws s3 provider + Region string `json:"region"` + // optional + DisableSSL *bool `json:"disableSSL"` +} + +type S3Credentials struct { + // optional + FromEnv bool `json:"fromEnv"` + // if FromEnv is False then SecretRef is required + SecretRef *S3SecretRef `json:"secretRef"` +} +type S3Override struct { + Endpoint string `json:"endpoint"` + // optional for any non aws s3 provider + Region string `json:"region"` + // optional + DisableSSL *bool `json:"disableSSL"` + BucketName string `json:"bucketName"` + KeyPrefix string `json:"keyPrefix"` + // required + Credentials *S3Credentials `json:"credentials"` +} +type S3SecretRef struct { + SecretName string `json:"secretName"` + // The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey" + AccessKeyKey string `json:"accessKeyKey"` + SecretKeyKey string `json:"secretKeyKey"` +} + +func (p S3ProviderConfig) ProvideSessionInfo(path string) (objectstore.SessionInfo, error) { + bucketConfig, err := objectstore.ParseBucketPathToConfig(path) + if err != nil { + return objectstore.SessionInfo{}, err + } + bucketName := bucketConfig.BucketName + bucketPrefix := bucketConfig.Prefix + queryString := bucketConfig.QueryString + + invalidConfigErr := func(err error) error { + return fmt.Errorf("invalid provider config: %w", err) + } + + params := map[string]string{} + + // 1. If provider config did not have a matching configuration for the provider inferred from pipelineroot OR + // 2. If a user has provided query parameters + // then we use blob.OpenBucket(ctx, config.bucketURL()) by setting "FromEnv = True" + if (p.Default == nil && p.Overrides == nil) || queryString != "" { + params["fromEnv"] = strconv.FormatBool(true) + return objectstore.SessionInfo{ + Provider: "s3", + Params: params, + }, nil + } + + if p.Default == nil || p.Default.Credentials == nil { + return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing default credentials")) + } + + params["endpoint"] = p.Default.Endpoint + params["region"] = p.Default.Region + + if p.Default.DisableSSL == nil { + params["disableSSL"] = strconv.FormatBool(false) + } else { + params["disableSSL"] = strconv.FormatBool(*p.Default.DisableSSL) + } + + params["fromEnv"] = strconv.FormatBool(p.Default.Credentials.FromEnv) + if !p.Default.Credentials.FromEnv { + params["secretName"] = p.Default.Credentials.SecretRef.SecretName + params["accessKeyKey"] = p.Default.Credentials.SecretRef.AccessKeyKey + params["secretKeyKey"] = p.Default.Credentials.SecretRef.SecretKeyKey + } + + // Set defaults + sessionInfo := objectstore.SessionInfo{ + Provider: "s3", + Params: params, + } + + // If there's a matching override, then override defaults with provided configs + override := p.getOverrideByPrefix(bucketName, bucketPrefix) + if override != nil { + if override.Endpoint != "" { + sessionInfo.Params["endpoint"] = override.Endpoint + } + if override.Region != "" { + sessionInfo.Params["region"] = override.Region + } + if override.DisableSSL != nil { + sessionInfo.Params["disableSSL"] = strconv.FormatBool(*override.DisableSSL) + } + if override.Credentials == nil { + return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing override credentials")) + } + params["fromEnv"] = strconv.FormatBool(override.Credentials.FromEnv) + if !override.Credentials.FromEnv { + if override.Credentials.SecretRef == nil { + return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing override secretref")) + } + params["secretName"] = override.Credentials.SecretRef.SecretName + params["accessKeyKey"] = override.Credentials.SecretRef.AccessKeyKey + params["secretKeyKey"] = override.Credentials.SecretRef.SecretKeyKey + } else { + // Don't need a secret if pulling from Env + delete(params, "secretName") + delete(params, "accessKeyKey") + delete(params, "secretKeyKey") + } + } + return sessionInfo, nil +} + +// getOverrideByPrefix returns first matching bucketname and prefix in overrides +func (p S3ProviderConfig) getOverrideByPrefix(bucketName, prefix string) *S3Override { + for _, override := range p.Overrides { + if override.BucketName == bucketName && strings.HasPrefix(prefix, override.KeyPrefix) { + return &override + } + } + return nil +} diff --git a/backend/src/v2/config/testdata/provider_cases.yaml b/backend/src/v2/config/testdata/provider_cases.yaml new file mode 100644 index 00000000000..eeba3160838 --- /dev/null +++ b/backend/src/v2/config/testdata/provider_cases.yaml @@ -0,0 +1,268 @@ +# Case names should be unique +cases: + # valid + - name: case0 + value: | + nomatch: {} + # valid + - name: case1 + value: | + minio: {} + # valid + - name: case2 + value: | + minio: + overrides: [] + # invalid if matching against "minio://" + - name: case3 + value: | + minio: + default: + endpoint: minio-endpoint-3.com + # invalid if matching against "minio://" + - name: case4 + value: | + minio: + default: + endpoint: minio-endpoint-4.com + region: minio + overrides: [] + # valid + - name: case5 + value: | + minio: + default: + endpoint: minio-endpoint-5.com + region: minio + disableSSL: true + credentials: + fromEnv: false + secretRef: + secretName: test-secret-5 + accessKeyKey: test-accessKeyKey-5 + secretKeyKey: test-secretKeyKey-5 + overrides: [] + # valid + - name: case6 + value: | + gs: + default: + credentials: + fromEnv: false + secretRef: + secretName: gs-test-secret-6 + tokenKey: gs-test-tokenKey-6 + overrides: + - bucketName: gs-bucket-a + keyPrefix: some/gs/path/1/2 + credentials: + fromEnv: false + secretRef: + secretName: gs-test-secret-6-a-1 + tokenKey: gs-test-tokenKey-6-a-1 + - bucketName: gs-bucket-a + keyPrefix: some/gs/path/1 + credentials: + fromEnv: false + secretRef: + secretName: gs-test-secret-6-a + tokenKey: gs-test-tokenKey-6-a + minio: + default: + endpoint: minio-endpoint-6.com + region: minio + disableSSL: true + credentials: + fromEnv: false + secretRef: + secretName: minio-test-secret-6 + accessKeyKey: minio-test-accessKeyKey-6 + secretKeyKey: minio-test-secretKeyKey-6 + overrides: + - endpoint: minio-endpoint-6.com + region: minio-a + disableSSL: true + bucketName: minio-bucket-a + keyPrefix: some/minio/path/a + credentials: + fromEnv: false + secretRef: + secretName: minio-test-secret-6-a + accessKeyKey: minio-test-accessKeyKey-6-a + secretKeyKey: minio-test-secretKeyKey-6-a + s3: + default: + endpoint: s3.amazonaws.com + region: us-east-1 + disableSSL: false + credentials: + fromEnv: false + secretRef: + secretName: s3-testsecret-6 + accessKeyKey: s3-testaccessKeyKey-6 + secretKeyKey: s3-testsecretKeyKey-6 + overrides: + - bucketName: s3-bucket + keyPrefix: some/s3/path/a + credentials: + fromEnv: false + secretRef: + secretName: s3-test-secret-6-a + accessKeyKey: s3-test-accessKeyKey-6-a + secretKeyKey: s3-test-secretKeyKey-6-a + - bucketName: s3-bucket + keyPrefix: some/s3/path/a/b + credentials: + fromEnv: false + secretRef: + secretName: s3-test-secret-6-a-1 + accessKeyKey: s3-test-accessKeyKey-6-a-1 + secretKeyKey: s3-test-secretKeyKey-6-a-1 + - bucketName: s3-bucket + keyPrefix: some/s3/path/b + endpoint: s3.us-east-2.amazonaws.com + region: us-east-2 + disableSSL: false + credentials: + fromEnv: false + secretRef: + secretName: s3-test-secret-6-b + accessKeyKey: s3-test-accessKeyKey-6-b + secretKeyKey: s3-test-secretKeyKey-6-b + - bucketName: s3-bucket + keyPrefix: some/s3/path/b/c + credentials: + fromEnv: false + secretRef: + secretName: s3-test-secret-6-b-1 + accessKeyKey: s3-test-accessKeyKey-6-b-1 + secretKeyKey: s3-test-secretKeyKey-6-b-1 + - bucketName: s3-bucket-2 + keyPrefix: some/s3/path/a + credentials: + fromEnv: false + secretRef: + secretName: s3-test-secret-6-a-2 + accessKeyKey: s3-test-accessKeyKey-6-a-2 + secretKeyKey: s3-test-secretKeyKey-6-a-2 + # invalid + - name: case7 + value: | + s3: + default: + endpoint: s3-endpoint-7.com + region: auto + overrides: [] + gs: + overrides: [] + minio: + default: + endpoint: minio-endpoint-7.com + region: auto + overrides: [] + # valid for default case + # invalid override - missing secretref in credentials + - name: case8 + value: | + minio: + default: + endpoint: minio-endpoint-8.com + region: minio + disableSSL: true + credentials: + fromEnv: false + secretRef: + secretName: minio-test-secret-8 + accessKeyKey: minio-test-accessKeyKey-8 + secretKeyKey: minio-test-secretKeyKey-8 + overrides: + - endpoint: minio-endpoint-8.com + region: minio-a + disableSSL: true + bucketName: minio-bucket-a + keyPrefix: some/minio/path/a + credentials: + fromEnv: false + # valid + # note that since override has "FromEnv: true" + # no secretRef is required + - name: case9 + value: | + minio: + default: + endpoint: minio-endpoint-9.com + region: minio + disableSSL: true + credentials: + fromEnv: false + secretRef: + secretName: minio-test-secret-9 + accessKeyKey: minio-test-accessKeyKey-9 + secretKeyKey: minio-test-secretKeyKey-9 + overrides: + - endpoint: minio-endpoint-9.com + region: minio-a + disableSSL: true + bucketName: minio-bucket-a + keyPrefix: some/minio/path/a + credentials: + fromEnv: true + - name: case10 + value: | + minio: + default: + endpoint: minio-endpoint-10.com + region: minio + disableSSL: true + credentials: + fromEnv: true + overrides: + - endpoint: minio-endpoint-10.com + region: minio-a + disableSSL: true + bucketName: minio-bucket-a + keyPrefix: some/minio/path/a + credentials: + fromEnv: false + secretRef: + secretName: minio-test-secret-10 + accessKeyKey: minio-test-accessKeyKey-10 + secretKeyKey: minio-test-secretKeyKey-10 + # valid + - name: case11 + value: | + gs: + default: + credentials: + fromEnv: true + overrides: + - bucketName: gs-bucket + keyPrefix: some/gs/path/1/2 + credentials: + fromEnv: true + - bucketName: gs-bucket + keyPrefix: some/gs/path/1 + credentials: + fromEnv: false + secretRef: + secretName: gs-test-secret-11 + tokenKey: gs-test-tokenKey-11 + # valid + - name: case12 + value: | + minio: + default: + endpoint: minio-endpoint-12.com + region: minio + disableSSL: true + credentials: + fromEnv: true + overrides: + - bucketName: bucket_name + keyPrefix: v2/artifacts/profile_name + credentials: + fromEnv: false + secretRef: + secretName: minio-test-secret-12-a + accessKeyKey: minio-test-accessKeyKey-12-a + secretKeyKey: minio-test-secretKeyKey-12-a \ No newline at end of file diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 15e476a346c..ebb194f646e 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "strconv" "time" @@ -130,26 +131,38 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio } // TODO(v2): in pipeline spec, rename GCS output directory to pipeline root. pipelineRoot := opts.RuntimeConfig.GetGcsOutputDirectory() + + restConfig, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err) + } + k8sClient, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize kubernetes client set: %w", err) + } + cfg, err := config.FromConfigMap(ctx, k8sClient, opts.Namespace) + if err != nil { + return nil, err + } + + storeSessionInfo := objectstore.SessionInfo{} if pipelineRoot != "" { glog.Infof("PipelineRoot=%q", pipelineRoot) } else { - restConfig, err := rest.InClusterConfig() - if err != nil { - return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err) - } - k8sClient, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to initialize kubernetes client set: %w", err) - } - cfg, err := config.FromConfigMap(ctx, k8sClient, opts.Namespace) - if err != nil { - return nil, err - } pipelineRoot = cfg.DefaultPipelineRoot() glog.Infof("PipelineRoot=%q from default config", pipelineRoot) } + storeSessionInfo, err = cfg.GetStoreSessionInfo(pipelineRoot) + if err != nil { + return nil, err + } + storeSessionInfoJSON, err := json.Marshal(storeSessionInfo) + if err != nil { + return nil, err + } + storeSessionInfoStr := string(storeSessionInfoJSON) // TODO(Bobgy): fill in run resource. - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot) + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot, storeSessionInfoStr) if err != nil { return nil, err } @@ -228,7 +241,7 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl } // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return nil, err } @@ -673,7 +686,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return nil, err } @@ -1340,7 +1353,7 @@ func createPVC( // Create execution regardless the operation succeeds or not defer func() { if createdExecution == nil { - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return } @@ -1420,7 +1433,7 @@ func createPVC( ecfg.CachedMLMDExecutionID = cachedMLMDExecutionID ecfg.FingerPrint = fingerPrint - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("error getting pipeline from MLMD: %w", err) } @@ -1510,7 +1523,7 @@ func deletePVC( // Create execution regardless the operation succeeds or not defer func() { if createdExecution == nil { - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return } @@ -1540,7 +1553,7 @@ func deletePVC( ecfg.CachedMLMDExecutionID = cachedMLMDExecutionID ecfg.FingerPrint = fingerPrint - pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "") + pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, "", "", "", "") if err != nil { return createdExecution, pb.Execution_FAILED, fmt.Errorf("error getting pipeline from MLMD: %w", err) } diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index 89b26b2fcac..4854809c88a 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -77,7 +77,7 @@ var ( ) type ClientInterface interface { - GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) + GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, storeSessionInfo string) (*Pipeline, error) GetDAG(ctx context.Context, executionID int64) (*DAG, error) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error) @@ -200,6 +200,18 @@ func (p *Pipeline) GetCtxID() int64 { return p.pipelineCtx.GetId() } +func (p *Pipeline) GetStoreSessionInfo() string { + if p == nil { + return "" + } + props := p.pipelineRunCtx.GetCustomProperties() + storeSessionInfo, ok := props[keyStoreSessionInfo] + if !ok { + return "" + } + return storeSessionInfo.GetStringValue() +} + func (p *Pipeline) GetPipelineRoot() string { if p == nil { return "" @@ -282,7 +294,7 @@ func GenerateOutputURI(pipelineRoot string, paths []string, preserveQueryString // GetPipeline returns the current pipeline represented by the specified // pipeline name and run ID. -func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) { +func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, storeSessionInfo string) (*Pipeline, error) { pipelineContext, err := c.getOrInsertContext(ctx, pipelineName, pipelineContextType, nil) if err != nil { return nil, err @@ -292,7 +304,8 @@ func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace keyNamespace: stringValue(namespace), keyResourceName: stringValue(runResource), // pipeline root of this run - keyPipelineRoot: stringValue(GenerateOutputURI(pipelineRoot, []string{pipelineName, runID}, true)), + keyPipelineRoot: stringValue(GenerateOutputURI(pipelineRoot, []string{pipelineName, runID}, true)), + keyStoreSessionInfo: stringValue(storeSessionInfo), } runContext, err := c.getOrInsertContext(ctx, runID, pipelineRunContextType, metadata) glog.Infof("Pipeline Run Context: %+v", runContext) @@ -492,6 +505,7 @@ const ( keyNamespace = "namespace" keyResourceName = "resource_name" keyPipelineRoot = "pipeline_root" + keyStoreSessionInfo = "store_session_info" keyCacheFingerPrint = "cache_fingerprint" keyCachedExecutionID = "cached_execution_id" keyInputs = "inputs" diff --git a/backend/src/v2/metadata/client_fake.go b/backend/src/v2/metadata/client_fake.go index c2887832d83..de8d007621e 100644 --- a/backend/src/v2/metadata/client_fake.go +++ b/backend/src/v2/metadata/client_fake.go @@ -32,7 +32,7 @@ func NewFakeClient() *FakeClient { return &FakeClient{} } -func (c *FakeClient) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) { +func (c *FakeClient) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string, storeSessionInfo string) (*Pipeline, error) { return nil, nil } diff --git a/backend/src/v2/metadata/client_test.go b/backend/src/v2/metadata/client_test.go index 86a16fe7724..94f081b32b0 100644 --- a/backend/src/v2/metadata/client_test.go +++ b/backend/src/v2/metadata/client_test.go @@ -89,7 +89,7 @@ func Test_GetPipeline(t *testing.T) { mlmdClient, err := NewTestMlmdClient() fatalIf(err) - pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot) + pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot, "") fatalIf(err) expectPipelineRoot := fmt.Sprintf("%s/get-pipeline-test/%s", pipelineRoot, runId) if pipeline.GetPipelineRoot() != expectPipelineRoot { @@ -138,10 +138,10 @@ func Test_GetPipeline_Twice(t *testing.T) { client, err := metadata.NewClient(testMlmdServerAddress, testMlmdServerPort) fatalIf(err) - pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot) + pipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot, "") fatalIf(err) // The second call to GetPipeline won't fail because it avoid inserting to MLMD again. - samePipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot) + samePipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot, "") fatalIf(err) if pipeline.GetCtxID() != samePipeline.GetCtxID() { t.Errorf("Expect pipeline context ID %d, actual is %d", pipeline.GetCtxID(), samePipeline.GetCtxID()) @@ -159,7 +159,7 @@ func Test_GetPipelineFromExecution(t *testing.T) { } client := newLocalClientOrFatal(t) ctx := context.Background() - pipeline, err := client.GetPipeline(ctx, "get-pipeline-from-execution", newUUIDOrFatal(t), "kubeflow", "workflow/abc", "gs://my-bucket/root") + pipeline, err := client.GetPipeline(ctx, "get-pipeline-from-execution", newUUIDOrFatal(t), "kubeflow", "workflow/abc", "gs://my-bucket/root", "") fatalIf(err) execution, err := client.CreateExecution(ctx, pipeline, &metadata.ExecutionConfig{ TaskName: "task1", @@ -193,7 +193,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot, "") if err != nil { t.Error(err) } @@ -205,7 +205,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText, pipelineRoot, "") if err != nil { t.Error(err) } @@ -274,7 +274,7 @@ func Test_DAG(t *testing.T) { client := newLocalClientOrFatal(t) ctx := context.Background() // These parameters do not matter. - pipeline, err := client.GetPipeline(ctx, "pipeline-name", newUUIDOrFatal(t), "ns1", "workflow/pipeline-1234", pipelineRoot) + pipeline, err := client.GetPipeline(ctx, "pipeline-name", newUUIDOrFatal(t), "ns1", "workflow/pipeline-1234", pipelineRoot, "") if err != nil { t.Fatal(err) } diff --git a/backend/src/v2/objectstore/config.go b/backend/src/v2/objectstore/config.go new file mode 100644 index 00000000000..06b26b8c436 --- /dev/null +++ b/backend/src/v2/objectstore/config.go @@ -0,0 +1,233 @@ +// Copyright 2024 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This package contains helper methods for using object stores. +package objectstore + +import ( + "encoding/json" + "fmt" + "github.com/golang/glog" + "os" + "path" + "regexp" + "strconv" + "strings" +) + +// The endpoint uses Kubernetes service DNS name with namespace: +// https://kubernetes.io/docs/concepts/services-networking/service/#dns +const defaultMinioEndpointInMultiUserMode = "minio-service.kubeflow:9000" + +type Config struct { + Scheme string + BucketName string + Prefix string + QueryString string + SessionInfo *SessionInfo +} + +type SessionInfo struct { + Provider string + Params map[string]string +} + +type GCSParams struct { + FromEnv bool + SecretName string + TokenKey string +} + +type S3Params struct { + FromEnv bool + SecretName string + // The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey" + AccessKeyKey string + SecretKeyKey string + Region string + Endpoint string + DisableSSL bool +} + +func (b *Config) bucketURL() string { + u := b.Scheme + b.BucketName + + // append prefix=b.prefix to existing queryString + q := b.QueryString + if len(b.Prefix) > 0 { + if len(q) > 0 { + q = q + "&prefix=" + b.Prefix + } else { + q = "?prefix=" + b.Prefix + } + } + + u = u + q + return u +} + +func (b *Config) PrefixedBucket() string { + return b.Scheme + path.Join(b.BucketName, b.Prefix) +} + +func (b *Config) KeyFromURI(uri string) (string, error) { + prefixedBucket := b.PrefixedBucket() + if !strings.HasPrefix(uri, prefixedBucket) { + return "", fmt.Errorf("URI %q does not have expected bucket prefix %q", uri, prefixedBucket) + } + + key := strings.TrimLeft(strings.TrimPrefix(uri, prefixedBucket), "/") + if len(key) == 0 { + return "", fmt.Errorf("URI %q has empty key given prefixed bucket %q", uri, prefixedBucket) + } + return key, nil +} + +func (b *Config) UriFromKey(blobKey string) string { + return b.Scheme + path.Join(b.BucketName, b.Prefix, blobKey) +} + +var bucketPattern = regexp.MustCompile(`(^[a-z][a-z0-9]+:///?)([^/?]+)(/[^?]*)?(\?.+)?$`) + +func ParseBucketConfig(path string, sess *SessionInfo) (*Config, error) { + config, err := ParseBucketPathToConfig(path) + if err != nil { + return nil, err + } + config.SessionInfo = sess + + return config, nil +} + +func ParseBucketPathToConfig(path string) (*Config, error) { + ms := bucketPattern.FindStringSubmatch(path) + if ms == nil || len(ms) != 5 { + return nil, fmt.Errorf("parse bucket config failed: unrecognized pipeline root format: %q", path) + } + + // TODO: Verify/add support for file:///. + if ms[1] != "gs://" && ms[1] != "s3://" && ms[1] != "minio://" && ms[1] != "mem://" { + return nil, fmt.Errorf("parse bucket config failed: unsupported Cloud bucket: %q", path) + } + + prefix := strings.TrimPrefix(ms[3], "/") + if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") { + prefix = prefix + "/" + } + + return &Config{ + Scheme: ms[1], + BucketName: ms[2], + Prefix: prefix, + QueryString: ms[4], + }, nil +} + +func ParseBucketConfigForArtifactURI(uri string) (*Config, error) { + ms := bucketPattern.FindStringSubmatch(uri) + if ms == nil || len(ms) != 5 { + return nil, fmt.Errorf("parse bucket config failed: unrecognized uri format: %q", uri) + } + + // TODO: Verify/add support for file:///. + if ms[1] != "gs://" && ms[1] != "s3://" && ms[1] != "minio://" && ms[1] != "mem://" { + return nil, fmt.Errorf("parse bucket config failed: unsupported Cloud bucket: %q", uri) + } + + return &Config{ + Scheme: ms[1], + BucketName: ms[2], + }, nil +} + +func MinioDefaultEndpoint() string { + // Discover minio-service in the same namespace by env var. + // https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables + minioHost := os.Getenv("MINIO_SERVICE_SERVICE_HOST") + minioPort := os.Getenv("MINIO_SERVICE_SERVICE_PORT") + if minioHost != "" && minioPort != "" { + // If there is a minio-service Kubernetes service in the same namespace, + // MINIO_SERVICE_SERVICE_HOST and MINIO_SERVICE_SERVICE_PORT env vars should + // exist by default, so we use it as default. + return minioHost + ":" + minioPort + } + // If the env vars do not exist, we guess that we are running in KFP multi user mode, so default minio service should be `minio-service.kubeflow:9000`. + glog.Infof("Cannot detect minio-service in the same namespace, default to %s as MinIO endpoint.", defaultMinioEndpointInMultiUserMode) + return defaultMinioEndpointInMultiUserMode +} + +func GetSessionInfoFromString(sessionInfoJSON string) (*SessionInfo, error) { + sessionInfo := &SessionInfo{} + if sessionInfoJSON == "" { + return nil, nil + } + err := json.Unmarshal([]byte(sessionInfoJSON), sessionInfo) + if err != nil { + return nil, fmt.Errorf("Encountered error when attempting to unmarshall bucket session info properties: %w", err) + } + return sessionInfo, nil +} + +func StructuredS3Params(p map[string]string) (*S3Params, error) { + sparams := &S3Params{} + if val, ok := p["fromEnv"]; ok { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return nil, err + } + sparams.FromEnv = boolVal + } + if val, ok := p["secretName"]; ok { + sparams.SecretName = val + } + // The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey" + if val, ok := p["accessKeyKey"]; ok { + sparams.AccessKeyKey = val + } + if val, ok := p["secretKeyKey"]; ok { + sparams.SecretKeyKey = val + } + if val, ok := p["region"]; ok { + sparams.Region = val + } + if val, ok := p["endpoint"]; ok { + sparams.Endpoint = val + } + if val, ok := p["disableSSL"]; ok { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return nil, err + } + sparams.DisableSSL = boolVal + } + return sparams, nil +} + +func StructuredGCSParams(p map[string]string) (*GCSParams, error) { + sparams := &GCSParams{} + if val, ok := p["fromEnv"]; ok { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return nil, err + } + sparams.FromEnv = boolVal + } + if val, ok := p["secretName"]; ok { + sparams.SecretName = val + } + if val, ok := p["tokenKey"]; ok { + sparams.TokenKey = val + } + return sparams, nil +} diff --git a/backend/src/v2/objectstore/object_store.go b/backend/src/v2/objectstore/object_store.go index b4a0ca1d642..66693290105 100644 --- a/backend/src/v2/objectstore/object_store.go +++ b/backend/src/v2/objectstore/object_store.go @@ -12,107 +12,76 @@ // See the License for the specific language governing permissions and // limitations under the License. -// This package contains helper methods for using object stores. package objectstore import ( "context" "fmt" - "io" - "io/ioutil" - "os" - "path" - "path/filepath" - "regexp" - "strings" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/golang/glog" "gocloud.dev/blob" + "gocloud.dev/blob/gcsblob" _ "gocloud.dev/blob/gcsblob" "gocloud.dev/blob/s3blob" + "gocloud.dev/gcp" + "golang.org/x/oauth2/google" + "io" + "io/ioutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "os" + "path/filepath" + "strings" ) -type Config struct { - Scheme string - BucketName string - Prefix string - QueryString string -} - func OpenBucket(ctx context.Context, k8sClient kubernetes.Interface, namespace string, config *Config) (bucket *blob.Bucket, err error) { defer func() { if err != nil { err = fmt.Errorf("Failed to open bucket %q: %w", config.BucketName, err) } }() - if config.Scheme == "minio://" { - cred, err := getMinioCredential(ctx, k8sClient, namespace) - if err != nil { - return nil, fmt.Errorf("Failed to get minio credential: %w", err) - } - sess, err := session.NewSession(&aws.Config{ - Credentials: cred, - Region: aws.String("minio"), - Endpoint: aws.String(MinioDefaultEndpoint()), - DisableSSL: aws.Bool(true), - S3ForcePathStyle: aws.Bool(true), - }) - - if err != nil { - return nil, fmt.Errorf("Failed to create session to access minio: %v", err) - } - minioBucket, err := s3blob.OpenBucket(ctx, sess, config.BucketName, nil) - if err != nil { - return nil, err - } - // Directly calling s3blob.OpenBucket does not allow overriding prefix via bucketConfig.BucketURL(). - // Therefore, we need to explicitly configure the prefixed bucket. - return blob.PrefixedBucket(minioBucket, config.Prefix), nil - - } - return blob.OpenBucket(ctx, config.bucketURL()) -} - -func (b *Config) bucketURL() string { - u := b.Scheme + b.BucketName - - // append prefix=b.prefix to existing queryString - q := b.QueryString - if len(b.Prefix) > 0 { - if len(q) > 0 { - q = q + "&prefix=" + b.Prefix - } else { - q = "?prefix=" + b.Prefix + if config.SessionInfo != nil { + if config.SessionInfo.Provider == "minio" || config.SessionInfo.Provider == "s3" { + sess, err1 := createS3BucketSession(ctx, namespace, config.SessionInfo, k8sClient) + if err1 != nil { + return nil, fmt.Errorf("Failed to retrieve credentials for bucket %s: %w", config.BucketName, err1) + } + if sess != nil { + openedBucket, err2 := s3blob.OpenBucket(ctx, sess, config.BucketName, nil) + if err2 != nil { + return nil, err2 + } + // Directly calling s3blob.OpenBucket does not allow overriding prefix via bucketConfig.BucketURL(). + // Therefore, we need to explicitly configure the prefixed bucket. + return blob.PrefixedBucket(openedBucket, config.Prefix), nil + } + } else if config.SessionInfo.Provider == "gs" { + client, err1 := getGCSTokenClient(ctx, namespace, config.SessionInfo, k8sClient) + if err1 != nil { + return nil, err1 + } + if client != nil { + openedBucket, err2 := gcsblob.OpenBucket(ctx, client, config.BucketName, nil) + if err2 != nil { + return openedBucket, err2 + } + return blob.PrefixedBucket(openedBucket, config.Prefix), nil + } } } - u = u + q - return u -} -func (b *Config) PrefixedBucket() string { - return b.Scheme + path.Join(b.BucketName, b.Prefix) -} - -func (b *Config) KeyFromURI(uri string) (string, error) { - prefixedBucket := b.PrefixedBucket() - if !strings.HasPrefix(uri, prefixedBucket) { - return "", fmt.Errorf("URI %q does not have expected bucket prefix %q", uri, prefixedBucket) - } - - key := strings.TrimLeft(strings.TrimPrefix(uri, prefixedBucket), "/") - if len(key) == 0 { - return "", fmt.Errorf("URI %q has empty key given prefixed bucket %q", uri, prefixedBucket) + bucketURL := config.bucketURL() + // Since query parameters are only supported for s3:// paths + // if we detect minio scheme in pipeline root, replace it with s3:// scheme + // ref: https://gocloud.dev/howto/blob/#s3-compatible + if len(config.QueryString) > 0 && strings.HasPrefix(bucketURL, "minio://") { + bucketURL = strings.Replace(bucketURL, "minio://", "s3://", 1) } - return key, nil -} -func (b *Config) UriFromKey(blobKey string) string { - return b.Scheme + path.Join(b.BucketName, b.Prefix, blobKey) + // When no provider config is provided, or "FromEnv" is specified, use default credentials from the environment + return blob.OpenBucket(ctx, bucketURL) } func UploadBlob(ctx context.Context, bucket *blob.Bucket, localPath, blobPath string) error { @@ -179,50 +148,6 @@ func DownloadBlob(ctx context.Context, bucket *blob.Bucket, localDir, blobDir st return nil } -var bucketPattern = regexp.MustCompile(`(^[a-z][a-z0-9]+:///?)([^/?]+)(/[^?]*)?(\?.+)?$`) - -func ParseBucketConfig(path string) (*Config, error) { - ms := bucketPattern.FindStringSubmatch(path) - if ms == nil || len(ms) != 5 { - return nil, fmt.Errorf("parse bucket config failed: unrecognized pipeline root format: %q", path) - } - - // TODO: Verify/add support for file:///. - if ms[1] != "gs://" && ms[1] != "s3://" && ms[1] != "minio://" && ms[1] != "mem://" { - return nil, fmt.Errorf("parse bucket config failed: unsupported Cloud bucket: %q", path) - } - - prefix := strings.TrimPrefix(ms[3], "/") - if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") { - prefix = prefix + "/" - } - - return &Config{ - Scheme: ms[1], - BucketName: ms[2], - Prefix: prefix, - QueryString: ms[4], - }, nil -} - -func ParseBucketConfigForArtifactURI(uri string) (*Config, error) { - ms := bucketPattern.FindStringSubmatch(uri) - if ms == nil || len(ms) != 5 { - return nil, fmt.Errorf("parse bucket config failed: unrecognized uri format: %q", uri) - } - - // TODO: Verify/add support for file:///. - if ms[1] != "gs://" && ms[1] != "s3://" && ms[1] != "minio://" && ms[1] != "mem://" { - return nil, fmt.Errorf("parse bucket config failed: unsupported Cloud bucket: %q", uri) - } - - return &Config{ - Scheme: ms[1], - BucketName: ms[2], - }, nil -} - -// TODO(neuromage): Move these helper functions to a storage package and add tests. func uploadFile(ctx context.Context, bucket *blob.Bucket, localFilePath, blobFilePath string) error { errorF := func(err error) error { return fmt.Errorf("uploadFile(): unable to complete copying %q to remote storage %q: %w", localFilePath, blobFilePath, err) @@ -286,57 +211,100 @@ func downloadFile(ctx context.Context, bucket *blob.Bucket, blobFilePath, localF return nil } -// The endpoint uses Kubernetes service DNS name with namespace: -// https://kubernetes.io/docs/concepts/services-networking/service/#dns -const defaultMinioEndpointInMultiUserMode = "minio-service.kubeflow:9000" -const minioArtifactSecretName = "mlpipeline-minio-artifact" +func getGCSTokenClient(ctx context.Context, namespace string, sessionInfo *SessionInfo, clientSet kubernetes.Interface) (client *gcp.HTTPClient, err error) { + params, err := StructuredGCSParams(sessionInfo.Params) + if err != nil { + return nil, err + } + if params.FromEnv { + return nil, nil + } + secret, err := clientSet.CoreV1().Secrets(namespace).Get(ctx, params.SecretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + tokenJson, ok := secret.Data[params.TokenKey] + if !ok || len(tokenJson) == 0 { + return nil, fmt.Errorf("key '%s' not found or is empty", params.TokenKey) + } + creds, err := google.CredentialsFromJSON(ctx, tokenJson, "https://www.googleapis.com/auth/devstorage.read_write") + if err != nil { + return nil, err + } + client, err = gcp.NewHTTPClient(gcp.DefaultTransport(), gcp.CredentialsTokenSource(creds)) + if err != nil { + return nil, err + } + return client, nil +} + +func createS3BucketSession(ctx context.Context, namespace string, sessionInfo *SessionInfo, client kubernetes.Interface) (*session.Session, error) { + if sessionInfo == nil { + return nil, nil + } + config := &aws.Config{} + params, err := StructuredS3Params(sessionInfo.Params) + if err != nil { + return nil, err + } + if params.FromEnv { + return nil, nil + } + creds, err := getS3BucketCredential(ctx, client, namespace, params.SecretName, params.SecretKeyKey, params.AccessKeyKey) + if err != nil { + return nil, err + } + config.Credentials = creds + config.Region = aws.String(params.Region) + config.DisableSSL = aws.Bool(params.DisableSSL) + config.S3ForcePathStyle = aws.Bool(true) + + // AWS Specific: + // Path-style S3 endpoints, which are commonly used, may fall into either of two subdomains: + // 1) s3.amazonaws.com + // 2) s3..amazonaws.com + // for (1) the endpoint is not required, thus we skip it, otherwise the writer will fail to close due to region mismatch. + // https://aws.amazon.com/blogs/infrastructure-and-automation/best-practices-for-using-amazon-s3-endpoints-in-aws-cloudformation-templates/ + // https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ + if strings.ToLower(params.Endpoint) != "s3.amazonaws.com" { + config.Endpoint = aws.String(params.Endpoint) + } -func MinioDefaultEndpoint() string { - // Discover minio-service in the same namespace by env var. - // https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables - minioHost := os.Getenv("MINIO_SERVICE_SERVICE_HOST") - minioPort := os.Getenv("MINIO_SERVICE_SERVICE_PORT") - if minioHost != "" && minioPort != "" { - // If there is a minio-service Kubernetes service in the same namespace, - // MINIO_SERVICE_SERVICE_HOST and MINIO_SERVICE_SERVICE_PORT env vars should - // exist by default, so we use it as default. - return minioHost + ":" + minioPort + sess, err := session.NewSession(config) + if err != nil { + return nil, fmt.Errorf("Failed to create object store session, %v", err) } - // If the env vars do not exist, we guess that we are running in KFP multi user mode, so default minio service should be `minio-service.kubeflow:9000`. - glog.Infof("Cannot detect minio-service in the same namespace, default to %s as MinIO endpoint.", defaultMinioEndpointInMultiUserMode) - return defaultMinioEndpointInMultiUserMode + return sess, nil } -func getMinioCredential(ctx context.Context, clientSet kubernetes.Interface, namespace string) (cred *credentials.Credentials, err error) { +func getS3BucketCredential( + ctx context.Context, + clientSet kubernetes.Interface, + namespace string, + secretName string, + bucketSecretKeyKey string, + bucketAccessKeyKey string, +) (cred *credentials.Credentials, err error) { defer func() { if err != nil { // wrap error before returning - err = fmt.Errorf("Failed to get MinIO credential from secret name=%q namespace=%q: %w", minioArtifactSecretName, namespace, err) + err = fmt.Errorf("Failed to get Bucket credentials from secret name=%q namespace=%q: %w", secretName, namespace, err) } }() secret, err := clientSet.CoreV1().Secrets(namespace).Get( ctx, - minioArtifactSecretName, + secretName, metav1.GetOptions{}) if err != nil { return nil, err } - accessKey := string(secret.Data["accesskey"]) - secretKey := string(secret.Data["secretkey"]) + // The k8s secret "Key" for "SecretKey" and "AccessKey" + accessKey := string(secret.Data[bucketAccessKeyKey]) + secretKey := string(secret.Data[bucketSecretKeyKey]) if accessKey != "" && secretKey != "" { cred = credentials.NewStaticCredentials(accessKey, secretKey, "") return cred, err } - - aws_cred, err := getAWSCredential() - if aws_cred != nil { - return aws_cred, err - } - - return nil, fmt.Errorf("does not have 'accesskey' or 'secretkey' key") -} - -func getAWSCredential() (cred *credentials.Credentials, err error) { - return credentials.NewCredentials(&credentials.ChainProvider{}), nil + return nil, fmt.Errorf("could not find specified keys '%s' or '%s'", bucketAccessKeyKey, bucketSecretKeyKey) } diff --git a/backend/src/v2/objectstore/object_store_test.go b/backend/src/v2/objectstore/object_store_test.go index 86cd48da521..7cefdeb1ee7 100644 --- a/backend/src/v2/objectstore/object_store_test.go +++ b/backend/src/v2/objectstore/object_store_test.go @@ -12,14 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package objectstore_test +package objectstore import ( + "context" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" "os" "reflect" "testing" - "github.com/kubeflow/pipelines/backend/src/v2/objectstore" _ "gocloud.dev/blob/gcsblob" ) @@ -27,13 +35,13 @@ func Test_parseCloudBucket(t *testing.T) { tests := []struct { name string path string - want *objectstore.Config + want *Config wantErr bool }{ { name: "Parses GCS - Just the bucket", path: "gs://my-bucket", - want: &objectstore.Config{ + want: &Config{ Scheme: "gs://", BucketName: "my-bucket", Prefix: "", @@ -43,7 +51,7 @@ func Test_parseCloudBucket(t *testing.T) { { name: "Parses GCS - Just the bucket with trailing slash", path: "gs://my-bucket/", - want: &objectstore.Config{ + want: &Config{ Scheme: "gs://", BucketName: "my-bucket", Prefix: "", @@ -53,7 +61,7 @@ func Test_parseCloudBucket(t *testing.T) { { name: "Parses GCS - Bucket with prefix", path: "gs://my-bucket/my-path", - want: &objectstore.Config{ + want: &Config{ Scheme: "gs://", BucketName: "my-bucket", Prefix: "my-path/", @@ -63,7 +71,7 @@ func Test_parseCloudBucket(t *testing.T) { { name: "Parses GCS - Bucket with prefix and trailing slash", path: "gs://my-bucket/my-path/", - want: &objectstore.Config{ + want: &Config{ Scheme: "gs://", BucketName: "my-bucket", Prefix: "my-path/", @@ -73,7 +81,7 @@ func Test_parseCloudBucket(t *testing.T) { { name: "Parses GCS - Bucket with multiple path components in prefix", path: "gs://my-bucket/my-path/123", - want: &objectstore.Config{ + want: &Config{ Scheme: "gs://", BucketName: "my-bucket", Prefix: "my-path/123/", @@ -83,7 +91,7 @@ func Test_parseCloudBucket(t *testing.T) { { name: "Parses GCS - Bucket with multiple path components in prefix and trailing slash", path: "gs://my-bucket/my-path/123/", - want: &objectstore.Config{ + want: &Config{ Scheme: "gs://", BucketName: "my-bucket", Prefix: "my-path/123/", @@ -93,7 +101,7 @@ func Test_parseCloudBucket(t *testing.T) { { name: "Parses Minio - Bucket with query string", path: "minio://my-bucket", - want: &objectstore.Config{ + want: &Config{ Scheme: "minio://", BucketName: "my-bucket", Prefix: "", @@ -103,7 +111,7 @@ func Test_parseCloudBucket(t *testing.T) { }, { name: "Parses Minio - Bucket with prefix", path: "minio://my-bucket/my-path", - want: &objectstore.Config{ + want: &Config{ Scheme: "minio://", BucketName: "my-bucket", Prefix: "my-path/", @@ -113,18 +121,40 @@ func Test_parseCloudBucket(t *testing.T) { }, { name: "Parses Minio - Bucket with multiple path components in prefix", path: "minio://my-bucket/my-path/123", - want: &objectstore.Config{ + want: &Config{ Scheme: "minio://", BucketName: "my-bucket", Prefix: "my-path/123/", QueryString: "", }, wantErr: false, + }, { + name: "Parses S3 - Bucket with session", + path: "s3://my-bucket/my-path/123", + want: &Config{ + Scheme: "s3://", + BucketName: "my-bucket", + Prefix: "my-path/123/", + QueryString: "", + SessionInfo: &SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-1", + "endpoint": "s3.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "s3-testsecret", + "accessKeyKey": "s3-testaccessKeyKey", + "secretKeyKey": "s3-testsecretKeyKey", + }, + }, + }, + wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := objectstore.ParseBucketConfig(tt.path) + got, err := ParseBucketConfig(tt.path, tt.want.SessionInfo) if (err != nil) != tt.wantErr { t.Errorf("%q: parseCloudBucket() error = %v, wantErr %v", tt.name, err, tt.wantErr) return @@ -132,6 +162,7 @@ func Test_parseCloudBucket(t *testing.T) { if !reflect.DeepEqual(got, tt.want) { t.Errorf("%q: parseCloudBucket() = %v, want %v", tt.name, got, tt.want) } + assert.Equal(t, got.SessionInfo, tt.want.SessionInfo) }) } } @@ -145,21 +176,21 @@ func Test_bucketConfig_KeyFromURI(t *testing.T) { tests := []struct { name string - bucketConfig *objectstore.Config + bucketConfig *Config uri string want string wantErr bool }{ { name: "Bucket with empty prefix", - bucketConfig: &objectstore.Config{Scheme: "gs://", BucketName: "my-bucket", Prefix: ""}, + bucketConfig: &Config{Scheme: "gs://", BucketName: "my-bucket", Prefix: ""}, uri: "gs://my-bucket/path1/path2", want: "path1/path2", wantErr: false, }, { name: "Bucket with non-empty Prefix ", - bucketConfig: &objectstore.Config{Scheme: "gs://", BucketName: "my-bucket", Prefix: "path0/"}, + bucketConfig: &Config{Scheme: "gs://", BucketName: "my-bucket", Prefix: "path0/"}, uri: "gs://my-bucket/path0/path1/path2", want: "path1/path2", wantErr: false, @@ -215,7 +246,7 @@ func Test_GetMinioDefaultEndpoint(t *testing.T) { } else { os.Unsetenv("MINIO_SERVICE_SERVICE_PORT") } - got := objectstore.MinioDefaultEndpoint() + got := MinioDefaultEndpoint() if got != tt.want { t.Errorf( "MinioDefaultEndpoint() = %q, want %q\nwhen MINIO_SERVICE_SERVICE_HOST=%q MINIO_SERVICE_SERVICE_PORT=%q", @@ -225,3 +256,130 @@ func Test_GetMinioDefaultEndpoint(t *testing.T) { }) } } + +func Test_createS3BucketSession(t *testing.T) { + tt := []struct { + msg string + ns string + sessionInfo *SessionInfo + sessionSecret *corev1.Secret + expectedConfig *aws.Config + wantErr bool + errorMsg string + }{ + { + msg: "Bucket with session", + ns: "testnamespace", + sessionInfo: &SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-1", + "endpoint": "s3.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "s3-provider-secret", + "accessKeyKey": "test_access_key", + "secretKeyKey": "test_secret_key", + }, + }, + sessionSecret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "s3-provider-secret", Namespace: "testnamespace"}, + Data: map[string][]byte{"test_secret_key": []byte("secretKey"), "test_access_key": []byte("accessKey")}, + }, + expectedConfig: &aws.Config{ + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + Region: aws.String("us-east-1"), + Endpoint: aws.String("s3.amazonaws.com"), + DisableSSL: aws.Bool(false), + S3ForcePathStyle: aws.Bool(true), + }, + }, + { + msg: "Bucket with no session", + ns: "testnamespace", + sessionInfo: nil, + sessionSecret: nil, + expectedConfig: nil, + }, + { + msg: "Bucket with session but secret doesn't exist", + ns: "testnamespace", + sessionInfo: &SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-1", + "endpoint": "s3.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "does-not-exist", + "accessKeyKey": "test_access_key", + "secretKeyKey": "test_secret_key", + }, + }, + sessionSecret: nil, + expectedConfig: nil, + wantErr: true, + errorMsg: "secrets \"does-not-exist\" not found", + }, + { + msg: "Bucket with session secret exists but key mismatch", + ns: "testnamespace", + sessionInfo: &SessionInfo{ + Provider: "s3", + Params: map[string]string{ + "region": "us-east-1", + "endpoint": "s3.amazonaws.com", + "disableSSL": "false", + "fromEnv": "false", + "secretName": "s3-provider-secret", + "accessKeyKey": "does_not_exist_secret_key", + "secretKeyKey": "does_not_exist_access_key", + }, + }, + sessionSecret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "s3-provider-secret", Namespace: "testnamespace"}, + Data: map[string][]byte{"test_secret_key": []byte("secretKey"), "test_access_key": []byte("accessKey")}, + }, + expectedConfig: nil, + wantErr: true, + errorMsg: "could not find specified keys", + }, + } + for _, test := range tt { + t.Run(test.msg, func(t *testing.T) { + fakeKubernetesClientset := fake.NewSimpleClientset() + ctx := context.Background() + + if test.sessionSecret != nil { + testersecret, err := fakeKubernetesClientset.CoreV1().Secrets(test.ns).Create( + ctx, + test.sessionSecret, + metav1.CreateOptions{}) + assert.Nil(t, err) + fmt.Printf(testersecret.Namespace) + } + + actualSession, err := createS3BucketSession(ctx, test.ns, test.sessionInfo, fakeKubernetesClientset) + if test.wantErr { + assert.Error(t, err) + if test.errorMsg != "" { + assert.Contains(t, err.Error(), test.errorMsg) + } + } else { + assert.Nil(t, err) + } + + if test.expectedConfig != nil { + // confirm config is populated with values from the session + expectedSess, err := session.NewSession(test.expectedConfig) + assert.Nil(t, err) + assert.Equal(t, expectedSess.Config.Region, actualSession.Config.Region) + assert.Equal(t, expectedSess.Config.Credentials, actualSession.Config.Credentials) + assert.Equal(t, expectedSess.Config.DisableSSL, actualSession.Config.DisableSSL) + assert.Equal(t, expectedSess.Config.S3ForcePathStyle, actualSession.Config.S3ForcePathStyle) + } else { + assert.Nil(t, actualSession) + } + }) + } +} diff --git a/go.mod b/go.mod index 5901804d077..52fb544ad12 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,7 @@ require ( require ( github.com/prometheus/client_golang v1.16.0 + golang.org/x/oauth2 v0.13.0 google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c ) @@ -176,7 +177,6 @@ require ( golang.org/x/crypto v0.16.0 // indirect golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/term v0.15.0 // indirect