Skip to content

Commit

Permalink
FORK: fix bucket root query strings (upstream kubeflow#10319)
Browse files Browse the repository at this point in the history
Signed-off-by: Mathew Wicks <[email protected]>
  • Loading branch information
thesuperzapper committed Apr 21, 2024
1 parent 4e2b07d commit b976ce5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 9 deletions.
11 changes: 3 additions & 8 deletions v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
pb "github.com/kubeflow/pipelines/v2/third_party/ml_metadata"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"path"
"strconv"
"strings"
)
Expand Down Expand Up @@ -524,7 +523,9 @@ func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.C
outputs.Artifacts[name] = &pipelinespec.ArtifactList{
Artifacts: []*pipelinespec.RuntimeArtifact{
{
Uri: generateOutputURI(pipelineRoot, name, taskName),
// Do not preserve the query string for output artifacts, as otherwise
// they'd appear in file and artifact names.
Uri: metadata.GenerateOutputURI(pipelineRoot, []string{taskName, name}, false),
Type: artifact.GetArtifactType(),
Metadata: artifact.GetMetadata(),
},
Expand All @@ -539,9 +540,3 @@ func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.C
}
return outputs
}

func generateOutputURI(root, artifactName string, taskName string) string {
// we cannot path.Join(root, taskName, artifactName), because root
// contains scheme like gs:// and path.Join cleans up scheme to gs:/
return fmt.Sprintf("%s/%s", strings.TrimRight(root, "/"), path.Join(taskName, artifactName))
}
22 changes: 21 additions & 1 deletion v2/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ func (e *Execution) FingerPrint() string {
return e.execution.GetCustomProperties()[keyCacheFingerPrint].GetStringValue()
}

// GenerateOutputURI appends the specified paths to the pipeline root.
// It may be configured to preserve the query part of the pipeline root
// by splitting it off and appending it back to the full URI.
func GenerateOutputURI(pipelineRoot string, paths []string, preserveQueryString bool) string {
querySplit := strings.Split(pipelineRoot, "?")
query := ""
if len(querySplit) == 2 {
pipelineRoot = querySplit[0]
if preserveQueryString {
query = "?" + querySplit[1]
}
} else if len(querySplit) > 2 {
// this should never happen, but just in case.
glog.Warningf("Unexpected pipeline root: %v", pipelineRoot)
}
// we cannot path.Join(root, taskName, artifactName), because root
// contains scheme like gs:// and path.Join cleans up scheme to gs:/
return fmt.Sprintf("%s/%s%s", strings.TrimRight(pipelineRoot, "/"), path.Join(paths...), query)
}

// GetPipeline returns the current pipeline represented by the specified
// pipeline name and run ID.
func (c *Client) GetPipeline(ctx context.Context, pipelineName, pipelineRunID, namespace, runResource, pipelineRoot string) (*Pipeline, error) {
Expand All @@ -266,7 +286,7 @@ func (c *Client) GetPipeline(ctx context.Context, pipelineName, pipelineRunID, n
keyNamespace: stringValue(namespace),
keyResourceName: stringValue(runResource),
// pipeline root of this run
keyPipelineRoot: stringValue(strings.TrimRight(pipelineRoot, "/") + "/" + path.Join(pipelineName, pipelineRunID)),
keyPipelineRoot: stringValue(GenerateOutputURI(pipelineRoot, []string{pipelineName, pipelineRunID}, true)),
}
pipelineRunContext, err := c.getOrInsertContext(ctx, pipelineRunID, pipelineRunContextType, runMetadata)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions v2/metadata/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,60 @@ func Test_GetPipelineConcurrently(t *testing.T) {
wg.Wait()
}

func Test_GenerateOutputURI(t *testing.T) {
// Const define the artifact name
const (
pipelineName = "my-pipeline-name"
runID = "my-run-id"
pipelineRoot = "minio://mlpipeline/v2/artifacts"
pipelineRootQuery = "?query=string&another=query"
)
tests := []struct {
name string
queryString string
paths []string
preserveQueryString bool
want string
}{
{
name: "plain pipeline root without preserveQueryString",
queryString: "",
paths: []string{pipelineName, runID},
preserveQueryString: false,
want: fmt.Sprintf("%s/%s/%s", pipelineRoot, pipelineName, runID),
},
{
name: "plain pipeline root with preserveQueryString",
queryString: "",
paths: []string{pipelineName, runID},
preserveQueryString: true,
want: fmt.Sprintf("%s/%s/%s", pipelineRoot, pipelineName, runID),
},
{
name: "pipeline root with query string without preserveQueryString",
queryString: pipelineRootQuery,
paths: []string{pipelineName, runID},
preserveQueryString: false,
want: fmt.Sprintf("%s/%s/%s", pipelineRoot, pipelineName, runID),
},
{
name: "pipeline root with query string with preserveQueryString",
queryString: pipelineRootQuery,
paths: []string{pipelineName, runID},
preserveQueryString: true,
want: fmt.Sprintf("%s/%s/%s%s", pipelineRoot, pipelineName, runID, pipelineRootQuery),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := metadata.GenerateOutputURI(fmt.Sprintf("%s%s", pipelineRoot, tt.queryString), tt.paths, tt.preserveQueryString)
if diff := cmp.Diff(got, tt.want); diff != "" {
t.Errorf("GenerateOutputURI() = %v, want %v\nDiff (-want, +got)\n%s", got, tt.want, diff)
}
})
}
}

func newLocalClientOrFatal(t *testing.T) *metadata.Client {
t.Helper()
client, err := metadata.NewClient("localhost", "8080")
Expand Down

0 comments on commit b976ce5

Please sign in to comment.