Skip to content

Commit

Permalink
feat(backend): add namespace & prefix scoped credentials to kfp-launc…
Browse files Browse the repository at this point in the history
…her config for object store paths (#10625)

* add bucket session info to pipeline context

Signed-off-by: Humair Khan <[email protected]>

* allow driver to read bucket session info

Instead of only reading the kfp-launcher when a custom pipeline root is specified, the root dag will now always read the kfp-launcher config to search for a matching bucket if such a configuration is provided in kfp-launcher

Signed-off-by: Humair Khan <[email protected]>

* add support for bucket prefix matching

Provides a structured configuration for bucket providers, whereby user can specify credentials for different providers and path prefixes. A new interface for providing sessions is introduced, which should be implemented for any new provider configuration support.

Signed-off-by: Humair Khan <[email protected]>

* allow object store to handle different providers

Utilizes blob provider specific constructors to open s3, minio, gcs accordingly. If a sessioninfo is provided (via kfp-launcher config) then the associated secret is fetched for each case to gain credentials. If fromEnv is provided, then the standard url opener is used. Also separates out config fields and operations to a separate file.

Signed-off-by: Humair Khan <[email protected]>

* utilize session info in launcher & importer

retrieves the session info (if provided via kfp-launcher) and utilizes it for opening the provider's associated bucket

Signed-off-by: Humair Khan <[email protected]>

* skip config for default aws s3 endpoint

Signed-off-by: Humair Khan <[email protected]>

* chore: refactor/clarify store session info naming

also added some additional code comments clarifying store cred variable usage

Signed-off-by: Humair Khan <[email protected]>

* chore: handle query parameters as s3

as well as update validation logic for provider config, and fix tests
accordingly.

Signed-off-by: Humair Khan <[email protected]>

---------

Signed-off-by: Humair Khan <[email protected]>
  • Loading branch information
HumairAK authored Apr 16, 2024
1 parent 809d576 commit 5e0f9b1
Show file tree
Hide file tree
Showing 17 changed files with 1,831 additions and 209 deletions.
2 changes: 1 addition & 1 deletion backend/src/v2/component/importer_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) {
}()
// TODO(Bobgy): there's no need to pass any parameters, because pipeline
// and pipeline run context have been created by root DAG driver.
pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "")
pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "", "")
if err != nil {
return err
}
Expand Down
27 changes: 20 additions & 7 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
return err
}
fingerPrint := execution.FingerPrint()
bucketConfig, err := objectstore.ParseBucketConfig(execution.GetPipeline().GetPipelineRoot())
storeSessionInfo, err := objectstore.GetSessionInfoFromString(execution.GetPipeline().GetStoreSessionInfo())
if err != nil {
return err
}
pipelineRoot := execution.GetPipeline().GetPipelineRoot()
bucketConfig, err := objectstore.ParseBucketConfig(pipelineRoot, storeSessionInfo)
if err != nil {
return err
}
Expand Down Expand Up @@ -534,14 +539,22 @@ func fetchNonDefaultBuckets(
}
// TODO: Support multiple artifacts someday, probably through the v2 engine.
artifact := artifactList.Artifacts[0]
// The artifact does not belong under the object store path for this run. Cases:
// 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath
// 2. Artifact is imported from the same bucket, but from a different path (re-use the same session)
// 3. Artifact is imported from a different bucket, or obj store (default to using user env in this case)
if !strings.HasPrefix(artifact.Uri, defaultBucketConfig.PrefixedBucket()) {
nonDefaultBucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri)
if err != nil {
return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), err)
nonDefaultBucketConfig, parseErr := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri)
if parseErr != nil {
return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), parseErr)
}
nonDefaultBucket, err := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig)
if err != nil {
return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), err)
// check if it's same bucket but under a different path, re-use the default bucket session in this case.
if (nonDefaultBucketConfig.Scheme == defaultBucketConfig.Scheme) && (nonDefaultBucketConfig.BucketName == defaultBucketConfig.BucketName) {
nonDefaultBucketConfig.SessionInfo = defaultBucketConfig.SessionInfo
}
nonDefaultBucket, bucketErr := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig)
if bucketErr != nil {
return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), bucketErr)
}
nonDefaultBuckets[nonDefaultBucketConfig.PrefixedBucket()] = nonDefaultBucket
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/component/launcher_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func Test_executeV2_Parameters(t *testing.T) {
fakeMetadataClient := metadata.NewFakeClient()
bucket, err := blob.OpenBucket(context.Background(), "mem://test-bucket")
assert.Nil(t, err)
bucketConfig, err := objectstore.ParseBucketConfig("mem://test-bucket/pipeline-root/")
bucketConfig, err := objectstore.ParseBucketConfig("mem://test-bucket/pipeline-root/", nil)
assert.Nil(t, err)
_, _, err = executeV2(context.Background(), test.executorInput, addNumbersComponent, "sh", test.executorArgs, bucket, bucketConfig, fakeMetadataClient, "namespace", fakeKubernetesClientset)

Expand Down
97 changes: 96 additions & 1 deletion backend/src/v2/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package config
import (
"context"
"fmt"
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"
"io/ioutil"
"sigs.k8s.io/yaml"
"strconv"
"strings"

"github.com/golang/glog"
Expand All @@ -32,8 +35,23 @@ const (
configMapName = "kfp-launcher"
defaultPipelineRoot = "minio://mlpipeline/v2/artifacts"
configKeyDefaultPipelineRoot = "defaultPipelineRoot"
configBucketProviders = "providers"
minioArtifactSecretName = "mlpipeline-minio-artifact"
// The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey"
minioArtifactSecretKeyKey = "secretkey"
minioArtifactAccessKeyKey = "accesskey"
)

type BucketProviders struct {
Minio *MinioProviderConfig `json:"minio"`
S3 *S3ProviderConfig `json:"s3"`
GCS *GCSProviderConfig `json:"gs"`
}

type SessionInfoProvider interface {
ProvideSessionInfo(path string) (objectstore.SessionInfo, error)
}

// Config is the KFP runtime configuration.
type Config struct {
data map[string]string
Expand All @@ -53,7 +71,7 @@ func FromConfigMap(ctx context.Context, clientSet kubernetes.Interface, namespac
return &Config{data: config.Data}, nil
}

// Config.DefaultPipelineRoot gets the configured default pipeline root.
// DefaultPipelineRoot gets the configured default pipeline root.
func (c *Config) DefaultPipelineRoot() string {
// The key defaultPipelineRoot is optional in launcher config.
if c == nil || c.data[configKeyDefaultPipelineRoot] == "" {
Expand Down Expand Up @@ -82,3 +100,80 @@ func InPodName() (string, error) {
name := string(podName)
return strings.TrimSuffix(name, "\n"), nil
}

func (c *Config) GetStoreSessionInfo(path string) (objectstore.SessionInfo, error) {
bucketConfig, err := objectstore.ParseBucketPathToConfig(path)
if err != nil {
return objectstore.SessionInfo{}, err
}
provider := strings.TrimSuffix(bucketConfig.Scheme, "://")
bucketProviders, err := c.getBucketProviders()
if err != nil {
return objectstore.SessionInfo{}, err
}

var sessProvider SessionInfoProvider

switch provider {
case "minio":
if bucketProviders == nil || bucketProviders.Minio == nil {
sessProvider = &MinioProviderConfig{}
} else {
sessProvider = bucketProviders.Minio
}
break
case "s3":
if bucketProviders == nil || bucketProviders.S3 == nil {
sessProvider = &S3ProviderConfig{}
} else {
sessProvider = bucketProviders.S3
}
break
case "gs":
if bucketProviders == nil || bucketProviders.GCS == nil {
sessProvider = &GCSProviderConfig{}
} else {
sessProvider = bucketProviders.GCS
}
break
default:
return objectstore.SessionInfo{}, fmt.Errorf("Encountered unsupported provider in provider config %s", provider)
}

sess, err := sessProvider.ProvideSessionInfo(path)
if err != nil {
return objectstore.SessionInfo{}, err
}
return sess, nil
}

// getBucketProviders gets the provider configuration
func (c *Config) getBucketProviders() (*BucketProviders, error) {
if c == nil || c.data[configBucketProviders] == "" {
return nil, nil
}
bucketProviders := &BucketProviders{}
configAuth := c.data[configBucketProviders]
err := yaml.Unmarshal([]byte(configAuth), bucketProviders)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall kfp bucket providers, ensure that providers config is well formed: %w", err)
}
return bucketProviders, nil
}

func getDefaultMinioSessionInfo() (objectstore.SessionInfo, error) {
sess := objectstore.SessionInfo{
Provider: "minio",
Params: map[string]string{
"region": "minio",
"endpoint": objectstore.MinioDefaultEndpoint(),
"disableSSL": strconv.FormatBool(true),
"fromEnv": strconv.FormatBool(false),
"secretName": minioArtifactSecretName,
// The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey"
"accessKeyKey": minioArtifactAccessKeyKey,
"secretKeyKey": minioArtifactSecretKeyKey,
},
}
return sess, nil
}
Loading

0 comments on commit 5e0f9b1

Please sign in to comment.