Skip to content

Commit

Permalink
Merge pull request #528 from openinfradev/argo
Browse files Browse the repository at this point in the history
feature. add api IsPausedWorkflow
  • Loading branch information
ktkfree authored Jul 2, 2024
2 parents e0e691f + 68ababc commit 610554b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/argo-client/client-mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (c *ArgoClientMockImpl) GetWorkflow(ctx context.Context, namespace string,
return nil, nil
}

func (c *ArgoClientMockImpl) IsPausedWorkflow(ctx context.Context, namespace string, workflowName string) (bool, error) {
return false, nil
}

func (c *ArgoClientMockImpl) GetWorkflowLog(ctx context.Context, namespace string, container string, workflowName string) (string, error) {
return "", nil
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/argo-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type ArgoClient interface {
GetWorkflowTemplates(ctx context.Context, namespace string) (*GetWorkflowTemplatesResponse, error)
GetWorkflow(ctx context.Context, namespace string, workflowName string) (*Workflow, error)
IsPausedWorkflow(ctx context.Context, namespace string, workflowName string) (bool, error)
GetWorkflowLog(ctx context.Context, namespace string, container string, workflowName string) (logs string, err error)
GetWorkflows(ctx context.Context, namespace string) (*GetWorkflowsResponse, error)
SumbitWorkflowFromWftpl(ctx context.Context, wftplName string, opts SubmitOptions) (string, error)
Expand Down Expand Up @@ -111,6 +112,60 @@ func (c *ArgoClientImpl) GetWorkflow(ctx context.Context, namespace string, work
return &workflowRes, nil
}

func (c *ArgoClientImpl) IsPausedWorkflow(ctx context.Context, namespace string, workflowName string) (bool, error) {
res, err := c.client.Get(fmt.Sprintf("%s/api/v1/workflows/%s/%s", c.url, namespace, workflowName))
if err != nil {
return false, err
}
if res == nil {
return false, fmt.Errorf("Failed to call argo workflow.")
}
if res.StatusCode != 200 {
return false, fmt.Errorf("Invalid http status. return code: %d", res.StatusCode)
}

defer func() {
if err := res.Body.Close(); err != nil {
log.Error(ctx, "error closing http body")
}
}()

body, err := io.ReadAll(res.Body)
if err != nil {
return false, err
}

var workflowRes interface{}
if err := json.Unmarshal(body, &workflowRes); err != nil {
log.Error(ctx, "an error was unexpected while parsing response from api /workflow template.")
return false, err
}

dataMap, ok := workflowRes.(map[string]interface{})
if !ok {
fmt.Println("Invalid JSON structure")
return false, err
}

status, ok := dataMap["status"].(map[string]interface{})
if !ok {
return false, err
}
nodes, ok := status["nodes"].(map[string]interface{})
if !ok {
return false, err
}

for _, node := range nodes {
nodeJson, ok := node.(map[string]interface{})
if ok && nodeJson["displayName"] == "suspend" && nodeJson["phase"] == "Running" {
return true, err
}
}

return false, nil
}

func (c *ArgoClientImpl) GetWorkflowLog(ctx context.Context, namespace string, container string, workflowName string) (logs string, err error) {
log.Info(ctx, fmt.Sprintf("%s/api/v1/workflows/%s/%s/log?logOptions.container=%s", c.url, namespace, workflowName, container))
res, err := c.client.Get(fmt.Sprintf("%s/api/v1/workflows/%s/%s/log?logOptions.container=%s", c.url, namespace, workflowName, container))
Expand Down
15 changes: 15 additions & 0 deletions pkg/argo-client/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 610554b

Please sign in to comment.