Skip to content

Commit

Permalink
Merge pull request #177 from fission/http-invocation-context
Browse files Browse the repository at this point in the history
Propagate HTTP invocation context
  • Loading branch information
erwinvaneyk authored Jul 25, 2018
2 parents cb6ff9d + 804e7de commit 26e6ac4
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 137 deletions.
15 changes: 10 additions & 5 deletions pkg/api/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/golang/protobuf/ptypes"
)

const ErrInvocationCanceled = "workflow invocation was canceled"

// Invocation contains the API functionality for controlling (workflow) invocations.
// This includes starting, stopping, and completing invocations.
type Invocation struct {
Expand Down Expand Up @@ -60,16 +62,19 @@ func (ia *Invocation) Cancel(invocationID string) error {
return validate.NewError("invocationID", errors.New("id should not be empty"))
}

err := ia.es.Append(&fes.Event{
data, err := proto.Marshal(&types.Error{
Message: ErrInvocationCanceled,
})
if err != nil {
data = []byte{}
}
return ia.es.Append(&fes.Event{
Type: events.Invocation_INVOCATION_CANCELED.String(),
Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationID),
Timestamp: ptypes.TimestampNow(),
Data: data,
Hints: &fes.EventHints{Completed: true},
})
if err != nil {
return err
}
return nil
}

// Complete forces the completion of an invocation. This function - used by the controller - is the only way
Expand Down
120 changes: 70 additions & 50 deletions pkg/apiserver/apiserver.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/apiserver/apiserver.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pkg/apiserver/apiserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ service WorkflowInvocationAPI {
};
}

rpc List (google.protobuf.Empty) returns (WorkflowInvocationList) {
rpc List (InvocationListQuery) returns (WorkflowInvocationList) {
option (google.api.http) = {
get: "/invocation"
};
Expand All @@ -110,6 +110,10 @@ service WorkflowInvocationAPI {
}
}

message InvocationListQuery {
repeated string workflows = 1;
}

message WorkflowInvocationIdentifier {
string id = 1;
}
Expand Down
31 changes: 27 additions & 4 deletions pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -67,15 +68,37 @@ func (gi *Invocation) Get(ctx context.Context, invocationID *WorkflowInvocationI
return wi.WorkflowInvocation, nil
}

func (gi *Invocation) List(context.Context, *empty.Empty) (*WorkflowInvocationList, error) {
func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*WorkflowInvocationList, error) {
var invocations []string
as := gi.wfiCache.List()
for _, a := range as {
if a.Type != aggregates.TypeWorkflowInvocation {
for _, aggregate := range as {
if aggregate.Type != aggregates.TypeWorkflowInvocation {
return nil, toErrorStatus(errors.New("invalid type in invocation cache"))
}

invocations = append(invocations, a.Id)
if len(query.Workflows) > 0 {
// TODO make more efficient (by moving list queries to cache)
entity, err := gi.wfiCache.GetAggregate(aggregate)
if err != nil {
logrus.Error("List: failed to fetch %v from cache: %v", aggregate, err)
continue
}
wfi := entity.(*aggregates.WorkflowInvocation)
if !contains(query.Workflows, wfi.GetSpec().GetWorkflowId()) {
continue
}
}

invocations = append(invocations, aggregate.Id)
}
return &WorkflowInvocationList{invocations}, nil
}

func contains(haystack []string, needle string) bool {
for i := 0; i < len(haystack); i++ {
if haystack[i] == needle {
return true
}
}
return false
}
Loading

0 comments on commit 26e6ac4

Please sign in to comment.