Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): preserve querystring in pipeline root (fixes #10318) #10319

Merged
merged 4 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"path"
"strconv"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -1062,7 +1060,7 @@ func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.C
outputs.Artifacts[name] = &pipelinespec.ArtifactList{
Artifacts: []*pipelinespec.RuntimeArtifact{
{
Uri: generateOutputURI(pipelineRoot, name, taskName),
Uri: metadata.AppendToPipelineRoot(pipelineRoot, []string{taskName, name}),
Type: artifact.GetArtifactType(),
Metadata: artifact.GetMetadata(),
},
Expand All @@ -1078,12 +1076,6 @@ func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.C
return outputs
}

func generateOutputURI(root, artifactName string, taskName string) string {
// we cannot path.Join(root, taskName, artifactName), because root
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now you new function is similar to the old generateOutputURI function, it make sense to bring these comments to the changes you have at line 1065 or just update the generateOutputURI function.

// contains scheme like gs:// and path.Join cleans up scheme to gs:/
return fmt.Sprintf("%s/%s", strings.TrimRight(root, "/"), path.Join(taskName, artifactName))
}

var accessModeMap = map[string]k8score.PersistentVolumeAccessMode{
"ReadWriteOnce": k8score.ReadWriteOnce,
"ReadOnlyMany": k8score.ReadOnlyMany,
Expand Down
22 changes: 21 additions & 1 deletion backend/src/v2/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,26 @@ func (e *Execution) FingerPrint() string {
return e.execution.GetCustomProperties()[keyCacheFingerPrint].GetStringValue()
}

// AppendToPipelineRoot appends the specified paths to the pipeline root.
// It preserves the query part of the pipeline root by splitting it off and
// appending it back to the full URI.
func AppendToPipelineRoot(pipelineRoot string, paths []string) string {
// split the query part off the root, if it exists.
// we will append it back later to the full URI.
querySplit := strings.Split(pipelineRoot, "?")
query := ""
if len(querySplit) > 1 {
pipelineRoot = querySplit[0]
query = "?" + querySplit[1]
} else if len(querySplit) > 2 {
// this should never happen, but just in case.
glog.Warningf("Unexpected pipeline root: %v", pipelineRoot)
}
// we cannot path.Join(root, taskName, artifactName), because root
// contains scheme like gs:// and path.Join cleans up scheme to gs:/
return fmt.Sprintf("%s/%s%s", strings.TrimRight(pipelineRoot, "/"), path.Join(paths...), query)
}

// GetPipeline returns the current pipeline represented by the specified
// pipeline name and run ID.
func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string) (*Pipeline, error) {
Expand All @@ -272,7 +292,7 @@ func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace
keyNamespace: stringValue(namespace),
keyResourceName: stringValue(runResource),
// pipeline root of this run
keyPipelineRoot: stringValue(strings.TrimRight(pipelineRoot, "/") + "/" + path.Join(pipelineName, runID)),
keyPipelineRoot: stringValue(AppendToPipelineRoot(pipelineRoot, []string{pipelineName, runID})),
}
runContext, err := c.getOrInsertContext(ctx, runID, pipelineRunContextType, metadata)
glog.Infof("Pipeline Run Context: %+v", runContext)
Expand Down
23 changes: 22 additions & 1 deletion backend/src/v2/metadata/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func Test_GetPipeline_Twice(t *testing.T) {
// The second call to GetPipeline won't fail because it avoid inserting to MLMD again.
samePipeline, err := client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource, pipelineRoot)
fatalIf(err)
if (pipeline.GetCtxID() != samePipeline.GetCtxID()) {
if pipeline.GetCtxID() != samePipeline.GetCtxID() {
t.Errorf("Expect pipeline context ID %d, actual is %d", pipeline.GetCtxID(), samePipeline.GetCtxID())
}
}
Expand Down Expand Up @@ -214,6 +214,27 @@ func Test_GetPipelineConcurrently(t *testing.T) {
wg.Wait()
}

func Test_appendToPipelineRoot(t *testing.T) {
// Const define the artifact name
const (
pipelineName = "my-pipeline-name"
runID = "my-run-id"
pipelineRoot = "minio://mlpipeline/v2/artifacts"
pipelineRootQuery = "?query=string&another=query"
)
// Test output uri generation with plain pipeline root, i.e. without any query strings
plainResult := metadata.AppendToPipelineRoot(pipelineRoot, []string{pipelineName, runID})
if plainResult != fmt.Sprintf("%s/%s/%s", pipelineRoot, pipelineName, runID) {
t.Errorf("Expected %s, got %s", fmt.Sprintf("%s/%s/%s", pipelineRoot, pipelineName, runID), plainResult)
}

// Make sure query strings in the pipeline root are correctly preserved (added to the end)
queryResult := metadata.AppendToPipelineRoot(fmt.Sprintf("%s%s", pipelineRoot, pipelineRootQuery), []string{pipelineName, runID})
if queryResult != fmt.Sprintf("%s/%s/%s%s", pipelineRoot, pipelineName, runID, pipelineRootQuery) {
t.Errorf("Expected %s, got %s", fmt.Sprintf("%s/%s/%s%s", pipelineRoot, pipelineName, runID, pipelineRootQuery), queryResult)
}
}

func Test_DAG(t *testing.T) {
t.Skip("Temporarily disable the test that requires cluster connection.")

Expand Down