From 68ababc5ed5e180b6104192fe656509eb1f859fc Mon Sep 17 00:00:00 2001 From: "taekyu.kang" Date: Tue, 2 Jul 2024 14:49:03 +0900 Subject: [PATCH] feature. add api IsPausedWorkflow --- pkg/argo-client/client-mock.go | 4 +++ pkg/argo-client/client.go | 55 ++++++++++++++++++++++++++++++++++ pkg/argo-client/mock/client.go | 15 ++++++++++ 3 files changed, 74 insertions(+) diff --git a/pkg/argo-client/client-mock.go b/pkg/argo-client/client-mock.go index 8a3412fd..739adbd3 100644 --- a/pkg/argo-client/client-mock.go +++ b/pkg/argo-client/client-mock.go @@ -32,6 +32,10 @@ func (c *ArgoClientMockImpl) GetWorkflow(ctx context.Context, namespace string, return nil, nil } +func (c *ArgoClientMockImpl) IsPausedWorkflow(ctx context.Context, namespace string, workflowName string) (bool, error) { + return false, nil +} + func (c *ArgoClientMockImpl) GetWorkflowLog(ctx context.Context, namespace string, container string, workflowName string) (string, error) { return "", nil } diff --git a/pkg/argo-client/client.go b/pkg/argo-client/client.go index 4adabc6b..9a16e456 100644 --- a/pkg/argo-client/client.go +++ b/pkg/argo-client/client.go @@ -15,6 +15,7 @@ import ( type ArgoClient interface { GetWorkflowTemplates(ctx context.Context, namespace string) (*GetWorkflowTemplatesResponse, error) GetWorkflow(ctx context.Context, namespace string, workflowName string) (*Workflow, error) + IsPausedWorkflow(ctx context.Context, namespace string, workflowName string) (bool, error) GetWorkflowLog(ctx context.Context, namespace string, container string, workflowName string) (logs string, err error) GetWorkflows(ctx context.Context, namespace string) (*GetWorkflowsResponse, error) SumbitWorkflowFromWftpl(ctx context.Context, wftplName string, opts SubmitOptions) (string, error) @@ -111,6 +112,60 @@ func (c *ArgoClientImpl) GetWorkflow(ctx context.Context, namespace string, work return &workflowRes, nil } +func (c *ArgoClientImpl) IsPausedWorkflow(ctx context.Context, namespace string, workflowName string) (bool, error) { + res, err := c.client.Get(fmt.Sprintf("%s/api/v1/workflows/%s/%s", c.url, namespace, workflowName)) + if err != nil { + return false, err + } + if res == nil { + return false, fmt.Errorf("Failed to call argo workflow.") + } + if res.StatusCode != 200 { + return false, fmt.Errorf("Invalid http status. return code: %d", res.StatusCode) + } + + defer func() { + if err := res.Body.Close(); err != nil { + log.Error(ctx, "error closing http body") + } + }() + + body, err := io.ReadAll(res.Body) + if err != nil { + return false, err + } + + var workflowRes interface{} + if err := json.Unmarshal(body, &workflowRes); err != nil { + log.Error(ctx, "an error was unexpected while parsing response from api /workflow template.") + return false, err + } + + dataMap, ok := workflowRes.(map[string]interface{}) + if !ok { + fmt.Println("Invalid JSON structure") + return false, err + } + + status, ok := dataMap["status"].(map[string]interface{}) + if !ok { + return false, err + } + nodes, ok := status["nodes"].(map[string]interface{}) + if !ok { + return false, err + } + + for _, node := range nodes { + nodeJson, ok := node.(map[string]interface{}) + if ok && nodeJson["displayName"] == "suspend" && nodeJson["phase"] == "Running" { + return true, err + } + } + + return false, nil +} + func (c *ArgoClientImpl) GetWorkflowLog(ctx context.Context, namespace string, container string, workflowName string) (logs string, err error) { log.Info(ctx, fmt.Sprintf("%s/api/v1/workflows/%s/%s/log?logOptions.container=%s", c.url, namespace, workflowName, container)) res, err := c.client.Get(fmt.Sprintf("%s/api/v1/workflows/%s/%s/log?logOptions.container=%s", c.url, namespace, workflowName, container)) diff --git a/pkg/argo-client/mock/client.go b/pkg/argo-client/mock/client.go index 8d9f23ce..0575de14 100644 --- a/pkg/argo-client/mock/client.go +++ b/pkg/argo-client/mock/client.go @@ -95,6 +95,21 @@ func (mr *MockArgoClientMockRecorder) GetWorkflows(ctx, namespace interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflows", reflect.TypeOf((*MockArgoClient)(nil).GetWorkflows), ctx, namespace) } +// IsPausedWorkflow mocks base method. +func (m *MockArgoClient) IsPausedWorkflow(ctx context.Context, namespace, workflowName string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsPausedWorkflow", ctx, namespace, workflowName) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsPausedWorkflow indicates an expected call of IsPausedWorkflow. +func (mr *MockArgoClientMockRecorder) IsPausedWorkflow(ctx, namespace, workflowName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPausedWorkflow", reflect.TypeOf((*MockArgoClient)(nil).IsPausedWorkflow), ctx, namespace, workflowName) +} + // ResumeWorkflow mocks base method. func (m *MockArgoClient) ResumeWorkflow(ctx context.Context, namespace, workflowName string) (*argowf.Workflow, error) { m.ctrl.T.Helper()