Skip to content

Commit

Permalink
Removed Swagger Golang client in the wfcli tool (#131)
Browse files Browse the repository at this point in the history
Before, we used the HTTP client generated with go-swagger in the wfcli. However it has been a nuisance more than a benefit, namely it...

- ...was bloated code (see the code removed vs added in this PR) without adding much additional functionality.
- ...had bloated, and cumbersome dependencies. The go-openapi packages are unversioned and have frequent API-breaking changes.
- ...prevented the reuse of functionality from the server-side (such as validation, event checking), as the generated client also had it is own custom models.

The client that replaces this is a simple http client that simply uses the existing models and does not depend on the go-openapi runtime
  • Loading branch information
erwinvaneyk authored Mar 22, 2018
1 parent 4ff1b3f commit 02259b5
Show file tree
Hide file tree
Showing 99 changed files with 330 additions and 7,501 deletions.
22 changes: 12 additions & 10 deletions cmd/wfcli/admin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"github.com/fission/fission-workflows/cmd/wfcli/swagger-client/client/admin_api"
"github.com/go-openapi/strfmt"
"context"
"net/http"

"github.com/fission/fission-workflows/pkg/apiserver/httpclient"
"github.com/urfave/cli"
)

Expand All @@ -16,10 +18,10 @@ var cmdAdmin = cli.Command{
Name: "halt",
Usage: "Stop the workflow engine from evaluating anything",
Action: func(c *cli.Context) error {
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
adminApi := admin_api.New(client, strfmt.Default)
_, err := adminApi.Halt(admin_api.NewHaltParams())
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
admin := httpclient.NewAdminApi(url.String(), http.Client{})
err := admin.Halt(ctx)
if err != nil {
panic(err)
}
Expand All @@ -30,10 +32,10 @@ var cmdAdmin = cli.Command{
Name: "resume",
Usage: "Resume the workflow engine evaluations",
Action: func(c *cli.Context) error {
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
adminApi := admin_api.New(client, strfmt.Default)
_, err := adminApi.Resume(admin_api.NewResumeParams())
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
admin := httpclient.NewAdminApi(url.String(), http.Client{})
err := admin.Resume(ctx)
if err != nil {
panic(err)
}
Expand Down
110 changes: 53 additions & 57 deletions cmd/wfcli/invocation.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"os"
"sort"
"time"

"github.com/fission/fission-workflows/cmd/wfcli/swagger-client/client/workflow_api"
"github.com/fission/fission-workflows/cmd/wfcli/swagger-client/client/workflow_invocation_api"
"github.com/fission/fission-workflows/cmd/wfcli/swagger-client/models"
"github.com/fission/fission-workflows/pkg/apiserver/httpclient"
"github.com/fission/fission-workflows/pkg/parse/yaml"
"github.com/fission/fission-workflows/pkg/types"
"github.com/go-openapi/strfmt"
"github.com/urfave/cli"
)

Expand All @@ -32,21 +31,23 @@ var cmdInvocation = cli.Command{
},
},
Action: func(c *cli.Context) error {
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
wfiApi := workflow_invocation_api.New(client, strfmt.Default)
//u := parseUrl(c.GlobalString("url"))
//client := createTransportClient(u)
//wfiApi := workflow_invocation_api.New(client, strfmt.Default)
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
wfiApi := httpclient.NewInvocationApi(url.String(), http.Client{})
switch c.NArg() {
case 0:
since := c.Duration("history")
invocationsList(os.Stdout, wfiApi, time.Now().Add(-since))
case 1:
// Get Workflow invocation
wfiId := c.Args().Get(0)
resp, err := wfiApi.WfiGet(workflow_invocation_api.NewWfiGetParams().WithID(wfiId))
wfi, err := wfiApi.Get(ctx, wfiId)
if err != nil {
panic(err)
}
wfi := resp.Payload
b, err := yaml.Marshal(wfi)
if err != nil {
panic(err)
Expand All @@ -57,11 +58,10 @@ var cmdInvocation = cli.Command{
default:
wfiId := c.Args().Get(0)
taskId := c.Args().Get(1)
resp, err := wfiApi.WfiGet(workflow_invocation_api.NewWfiGetParams().WithID(wfiId))
wfi, err := wfiApi.Get(ctx, wfiId)
if err != nil {
panic(err)
}
wfi := resp.Payload
ti, ok := wfi.Status.Tasks[taskId]
if !ok {
fmt.Println("Task invocation not found.")
Expand All @@ -82,14 +82,16 @@ var cmdInvocation = cli.Command{
Usage: "cancel <workflow-invocation-id>",
Action: func(c *cli.Context) error {
wfiId := c.Args().Get(0)
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
wfiApi := workflow_invocation_api.New(client, strfmt.Default)
resp, err := wfiApi.Cancel(workflow_invocation_api.NewCancelParams().WithID(wfiId))
//u := parseUrl(c.GlobalString("url"))
//client := createTransportClient(u)
//wfiApi := workflow_invocation_api.New(client, strfmt.Default)
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
wfiApi := httpclient.NewInvocationApi(url.String(), http.Client{})
err := wfiApi.Cancel(ctx, wfiId)
if err != nil {
panic(err)
}
fmt.Println(resp)
return nil
},
},
Expand All @@ -109,31 +111,32 @@ var cmdInvocation = cli.Command{
},
Action: func(c *cli.Context) error {
wfId := c.Args().Get(0)
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
wfiApi := workflow_invocation_api.New(client, strfmt.Default)
body := &models.WorkflowInvocationSpec{
WorkflowID: wfId,
Inputs: map[string]models.TypedValue{},
//u := parseUrl(c.GlobalString("url"))
//client := createTransportClient(u)
//wfiApi := workflow_invocation_api.New(client, strfmt.Default)
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
wfiApi := httpclient.NewInvocationApi(url.String(), http.Client{})
spec := &types.WorkflowInvocationSpec{
WorkflowId: wfId,
Inputs: map[string]*types.TypedValue{},
}
if c.Bool("sync") {
params := workflow_invocation_api.NewInvokeSyncParams().WithBody(body)
resp, err := wfiApi.InvokeSync(params)
resp, err := wfiApi.InvokeSync(ctx, spec)
if err != nil {
panic(err)
}
bs, err := yaml.Marshal(resp.Payload)
bs, err := yaml.Marshal(resp)
if err != nil {
panic(err)
}
fmt.Println(string(bs))
} else {
params := workflow_invocation_api.NewInvokeParams().WithBody(body)
resp, err := wfiApi.Invoke(params)
resp, err := wfiApi.Invoke(ctx, spec)
if err != nil {
panic(err)
}
fmt.Println(resp.Payload.ID)
fmt.Println(resp.Id)
}
return nil
},
Expand All @@ -147,28 +150,30 @@ var cmdInvocation = cli.Command{
return nil
}
wfiId := c.Args().Get(0)
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
wfApi := workflow_api.New(client, strfmt.Default)
wfiApi := workflow_invocation_api.New(client, strfmt.Default)

wfiResp, err := wfiApi.WfiGet(workflow_invocation_api.NewWfiGetParams().WithID(wfiId))
//u := parseUrl(c.GlobalString("url"))
//client := createTransportClient(u)
//wfApi := workflow_api.New(client, strfmt.Default)
//wfiApi := workflow_invocation_api.New(client, strfmt.Default)
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
wfiApi := httpclient.NewInvocationApi(url.String(), http.Client{})
wfApi := httpclient.NewWorkflowApi(url.String(), http.Client{})

wfi, err := wfiApi.Get(ctx, wfiId)
if err != nil {
panic(err)
}
wfi := wfiResp.Payload

wfResp, err := wfApi.WfGet(workflow_api.NewWfGetParams().WithID(wfi.Spec.WorkflowID))
wf, err := wfApi.Get(ctx, wfi.Spec.WorkflowId)
if err != nil {
panic(err)
}
wf := wfResp.Payload

wfiUpdated := wfi.Status.UpdatedAt.String()
wfiCreated := wfi.Metadata.CreatedAt.String()
table(os.Stdout, nil, [][]string{
{"ID", wfi.Metadata.ID},
{"WORKFLOW_ID", wfi.Spec.WorkflowID},
{"ID", wfi.Metadata.Id},
{"WORKFLOW_ID", wfi.Spec.WorkflowId},
{"CREATED", wfiCreated},
{"UPDATED", wfiUpdated},
{"STATUS", string(wfi.Status.Status)},
Expand All @@ -177,9 +182,9 @@ var cmdInvocation = cli.Command{

var rows [][]string
rows = collectStatus(wf.Spec.Tasks, wfi.Status.Tasks, rows)
dynamicTaskSpecs := map[string]models.TaskSpec{}
dynamicTaskSpecs := map[string]*types.TaskSpec{}
for k, v := range wfi.Status.DynamicTasks {
dynamicTaskSpecs[k] = *v.Spec
dynamicTaskSpecs[k] = v.Spec
}
rows = collectStatus(dynamicTaskSpecs, wfi.Status.Tasks, rows)

Expand All @@ -190,38 +195,35 @@ var cmdInvocation = cli.Command{
},
}

func invocationsList(out io.Writer, wfiApi *workflow_invocation_api.Client, since time.Time) {
func invocationsList(out io.Writer, wfiApi *httpclient.InvocationApi, since time.Time) {
// List workflows invocations
resp, err := wfiApi.WfiList(workflow_invocation_api.NewWfiListParams())
ctx := context.TODO()
wis, err := wfiApi.List(ctx)
if err != nil {
panic(err)
}
wis := resp.Payload
sort.Strings(wis.Invocations)
var rows [][]string
for _, wfiId := range wis.Invocations {
resp, err := wfiApi.WfiGet(workflow_invocation_api.NewWfiGetParams().WithID(wfiId))
wi, err := wfiApi.Get(ctx, wfiId)
if err != nil {
panic(err)
}
wi := resp.Payload
updated := wi.Status.UpdatedAt.String()
created := wi.Metadata.CreatedAt.String()

// TODO add filter params to endpoint instead
if isCompleted(wi.Status.Status) && since.Before(time.Time(wi.Status.UpdatedAt)) {
continue
}
// TODO filter old invocations and system invocations

rows = append(rows, []string{wfiId, wi.Spec.WorkflowID, string(wi.Status.Status),
rows = append(rows, []string{wfiId, wi.Spec.WorkflowId, string(wi.Status.Status),
created, updated})
}

table(out, []string{"ID", "WORKFLOW", "STATUS", "CREATED", "UPDATED"}, rows)

}

func collectStatus(tasks map[string]models.TaskSpec, taskStatus map[string]models.TaskInvocation,
func collectStatus(tasks map[string]*types.TaskSpec, taskStatus map[string]*types.TaskInvocation,
rows [][]string) [][]string {
var ids []string
for id := range tasks {
Expand All @@ -245,9 +247,3 @@ func collectStatus(tasks map[string]models.TaskSpec, taskStatus map[string]model
}
return rows
}

func isCompleted(status models.WorkflowInvocationStatusStatus) bool {
return status == models.WorkflowInvocationStatusStatusSUCCEEDED ||
status == models.WorkflowInvocationStatusStatusABORTED ||
status == models.WorkflowInvocationStatusStatusFAILED
}
7 changes: 3 additions & 4 deletions cmd/wfcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"text/tabwriter"

httptransport "github.com/go-openapi/runtime/client"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -61,9 +60,9 @@ func table(writer io.Writer, headings []string, rows [][]string) {
}
}

func createTransportClient(baseUrl *url.URL) *httptransport.Runtime {
return httptransport.New(baseUrl.Host, "/proxy/workflows-apiserver/", []string{baseUrl.Scheme})
}
//func createTransportClient(baseUrl *url.URL) *httptransport.Runtime {
// return httptransport.New(baseUrl.Host, "/proxy/workflows-apiserver/", []string{baseUrl.Scheme})
//}

func fail(msg ...interface{}) {
for _, line := range msg {
Expand Down
15 changes: 8 additions & 7 deletions cmd/wfcli/status.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"context"
"fmt"
"net/http"

"github.com/fission/fission-workflows/cmd/wfcli/swagger-client/client/admin_api"
"github.com/go-openapi/strfmt"
"github.com/fission/fission-workflows/pkg/apiserver/httpclient"
"github.com/urfave/cli"
)

Expand All @@ -13,15 +14,15 @@ var cmdStatus = cli.Command{
Aliases: []string{"s"},
Usage: "Check cluster status",
Action: func(c *cli.Context) error {
u := parseUrl(c.GlobalString("url"))
client := createTransportClient(u)
adminApi := admin_api.New(client, strfmt.Default)
ctx := context.TODO()
url := parseUrl(c.GlobalString("url"))
admin := httpclient.NewAdminApi(url.String(), http.Client{})

resp, err := adminApi.Status(admin_api.NewStatusParams())
resp, err := admin.Status(ctx)
if err != nil {
panic(err)
}
fmt.Printf(resp.Payload.Status)
fmt.Printf(resp.Status)

return nil
},
Expand Down
Loading

0 comments on commit 02259b5

Please sign in to comment.