diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go index 25ebc390926..e6dae29d639 100644 --- a/backend/src/v2/component/importer_launcher.go +++ b/backend/src/v2/component/importer_launcher.go @@ -111,7 +111,7 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) { }() // TODO(Bobgy): there's no need to pass any parameters, because pipeline // and pipeline run context have been created by root DAG driver. - pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "") + pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "", "") if err != nil { return err } diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index 92a951c1c12..623a4e983d4 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -156,7 +156,12 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) { return err } fingerPrint := execution.FingerPrint() - bucketConfig, err := objectstore.ParseBucketConfig(execution.GetPipeline().GetPipelineRoot()) + bucketSessionInfo, err := objectstore.GetSessionInfoFromString(execution.GetPipeline().GetPipelineBucketSession()) + if err != nil { + return err + } + pipelineRoot := execution.GetPipeline().GetPipelineRoot() + bucketConfig, err := objectstore.ParseBucketConfig(pipelineRoot, bucketSessionInfo) if err != nil { return err } @@ -534,14 +539,22 @@ func fetchNonDefaultBuckets( } // TODO: Support multiple artifacts someday, probably through the v2 engine. artifact := artifactList.Artifacts[0] + // The artifact does not belong under the object store path for this run. Cases: + // 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath + // 2. Artifact is imported from the same bucket, but from a different path (re-use the same session) + // 3. Artifact is imported from a different bucket, or obj store (default to using user env in this case) if !strings.HasPrefix(artifact.Uri, defaultBucketConfig.PrefixedBucket()) { - nonDefaultBucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri) - if err != nil { - return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), err) + nonDefaultBucketConfig, parseErr := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri) + if parseErr != nil { + return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), parseErr) } - nonDefaultBucket, err := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig) - if err != nil { - return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), err) + // check if it's same bucket but under a different path, re-use the default bucket session in this case. + if (nonDefaultBucketConfig.Scheme == defaultBucketConfig.Scheme) && (nonDefaultBucketConfig.BucketName == defaultBucketConfig.BucketName) { + nonDefaultBucketConfig.Session = defaultBucketConfig.Session + } + nonDefaultBucket, bucketErr := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig) + if bucketErr != nil { + return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), bucketErr) } nonDefaultBuckets[nonDefaultBucketConfig.PrefixedBucket()] = nonDefaultBucket } diff --git a/backend/src/v2/component/launcher_v2_test.go b/backend/src/v2/component/launcher_v2_test.go index e3e8835ff8b..5f9438b9fb0 100644 --- a/backend/src/v2/component/launcher_v2_test.go +++ b/backend/src/v2/component/launcher_v2_test.go @@ -77,7 +77,7 @@ func Test_executeV2_Parameters(t *testing.T) { fakeMetadataClient := metadata.NewFakeClient() bucket, err := blob.OpenBucket(context.Background(), "gs://test-bucket") assert.Nil(t, err) - bucketConfig, err := objectstore.ParseBucketConfig("gs://test-bucket/pipeline-root/") + bucketConfig, err := objectstore.ParseBucketConfig("gs://test-bucket/pipeline-root/", nil) assert.Nil(t, err) _, _, err = executeV2(context.Background(), test.executorInput, addNumbersComponent, "sh", test.executorArgs, bucket, bucketConfig, fakeMetadataClient, "namespace", fakeKubernetesClientset)