Skip to content

Commit

Permalink
Add fnenv.Notifier interface + restructured fnenv package (#106)
Browse files Browse the repository at this point in the history
As stepping stone to the optimized function execution functionality (pre-warming of imminent function executions), this adds fnenv.Notifier which defines the interface for notifying the function runtime environments. Additionally this PR moves interfaces related to the fnenv to the fnenv package to make it more self contained. Specifically it adds the following:

- Added initial interface for the fnenv.Notifier (which will allow the workflow engine to notify environments implementing the interface of future invocations)
- Re-added the disappeared gofmt and govet checks to the travis config
- Renamed a couple of variables to idiomatic go
- Renamed fnenv/test to fnenv/mock to better indicate the purpose of the package.
- Renamed a couple of files in fnenv to reflect actual contents
- Moved the fnenv interfaces (runtime, resolver) from the api/function to the fnenv package
- Added documentation throughout the fnenv package and subpackages.
- Replaced map with syncmap implementation in Fission Proxy due to potential concurrent read/write issue
- Moved status shortcuts to the status-struct rather than adding them to the flag field. This eliminates the need for the awkward `wi.Status.Status.Completed()` in favor of `wi.Status.Completed()`
  • Loading branch information
erwinvaneyk authored Feb 12, 2018
1 parent 59e4bc5 commit d3c45d4
Show file tree
Hide file tree
Showing 32 changed files with 479 additions and 371 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ install:
before_script:
# Build
- cd ${TRAVIS_BUILD_DIR}
- hack/verify-govet.sh
- hack/verify-gofmt.sh
- glide install
- build/build-linux.sh

Expand Down
37 changes: 19 additions & 18 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/fission/fission-workflows/pkg/controller/expr"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/eventstore/nats"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
"github.com/fission/fission-workflows/pkg/fnenv/native"
"github.com/fission/fission-workflows/pkg/fnenv/native/builtin"
Expand All @@ -38,9 +39,9 @@ import (
)

const (
GRPC_ADDRESS = ":5555"
API_GATEWAY_ADDRESS = ":8080"
FISSION_PROXY_ADDRESS = ":8888"
gRPCAddress = ":5555"
apiGatewayAddress = ":8080"
fissionProxyAddress = ":8888"
)

type Options struct {
Expand Down Expand Up @@ -96,8 +97,8 @@ func Run(ctx context.Context, opts *Options) error {
wfCache := getWorkflowCache(ctx, esPub)

// Resolvers and runtimes
resolvers := map[string]function.Resolver{}
runtimes := map[string]function.Runtime{}
resolvers := map[string]fnenv.Resolver{}
runtimes := map[string]fnenv.Runtime{}
if opts.InternalRuntime {
log.WithField("config", nil).Infof("Using Function Runtime: Internal")
runtimes["internal"] = setupInternalFunctionRuntime()
Expand Down Expand Up @@ -131,7 +132,7 @@ func Run(ctx context.Context, opts *Options) error {

// Http servers
if opts.Fission != nil {
proxySrv := &http.Server{Addr: FISSION_PROXY_ADDRESS}
proxySrv := &http.Server{Addr: fissionProxyAddress}
defer proxySrv.Shutdown(ctx)
runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers)
}
Expand All @@ -149,7 +150,7 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.ApiAdmin || opts.ApiWorkflow || opts.ApiWorkflowInvocation {
lis, err := net.Listen("tcp", GRPC_ADDRESS)
lis, err := net.Listen("tcp", gRPCAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand All @@ -159,17 +160,17 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.ApiHttp {
apiSrv := &http.Server{Addr: API_GATEWAY_ADDRESS}
apiSrv := &http.Server{Addr: apiGatewayAddress}
defer apiSrv.Shutdown(ctx)
var admin, wf, wfi string
if opts.ApiAdmin {
admin = GRPC_ADDRESS
admin = gRPCAddress
}
if opts.ApiWorkflow {
wf = GRPC_ADDRESS
wf = gRPCAddress
}
if opts.ApiWorkflowInvocation {
wfi = GRPC_ADDRESS
wfi = gRPCAddress
}
runHttpGateway(ctx, apiSrv, admin, wf, wfi)
}
Expand Down Expand Up @@ -273,23 +274,23 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher)
func runAdminApiServer(s *grpc.Server) {
adminServer := &apiserver.GrpcAdminApiServer{}
apiserver.RegisterAdminAPIServer(s, adminServer)
log.Infof("Serving admin gRPC API at %s.", GRPC_ADDRESS)
log.Infof("Serving admin gRPC API at %s.", gRPCAddress)
}

func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[string]function.Resolver, wfCache fes.CacheReader) {
func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[string]fnenv.Resolver, wfCache fes.CacheReader) {
workflowParser := parse.NewResolver(resolvers)
workflowValidator := parse.NewValidator()
workflowApi := workflow.NewApi(es, workflowParser)
workflowServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, workflowValidator, wfCache)
apiserver.RegisterWorkflowAPIServer(s, workflowServer)
log.Infof("Serving workflow gRPC API at %s.", GRPC_ADDRESS)
log.Infof("Serving workflow gRPC API at %s.", gRPCAddress)
}

func runWorkflowInvocationApiServer(s *grpc.Server, es fes.EventStore, wfiCache fes.CacheReader) {
invocationApi := invocation.NewApi(es)
invocationServer := apiserver.NewGrpcInvocationApiServer(invocationApi, wfiCache)
apiserver.RegisterWorkflowInvocationAPIServer(s, invocationServer)
log.Infof("Serving workflow invocation gRPC API at %s.", GRPC_ADDRESS)
log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress)
}

func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string, wfApiAddr string, wfiApiAddr string) {
Expand Down Expand Up @@ -326,7 +327,7 @@ func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string
}

func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.EventStore, wfiCache fes.CacheReader,
wfCache fes.CacheReader, resolvers map[string]function.Resolver) {
wfCache fes.CacheReader, resolvers map[string]fnenv.Resolver) {

workflowParser := parse.NewResolver(resolvers)
workflowValidator := parse.NewValidator()
Expand All @@ -344,15 +345,15 @@ func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.EventStore, wfiCac
}

func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore,
fnRuntimes map[string]function.Runtime) *controller.InvocationController {
fnRuntimes map[string]fnenv.Runtime) *controller.InvocationController {
functionApi := function.NewApi(fnRuntimes, es)
invocationApi := invocation.NewApi(es)
s := &scheduler.WorkflowScheduler{}
ep := expr.NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter)
return controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
}

func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]function.Resolver) *controller.WorkflowController {
func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]fnenv.Resolver) *controller.WorkflowController {
workflowApi := workflow.NewApi(es, parse.NewResolver(fnResolvers))
return controller.NewWorkflowController(wfCache, workflowApi)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/fission-workflows-bundle/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMain(m *testing.M) {
// Tests the submission of a workflow
func TestWorkflowCreate(t *testing.T) {
ctx := context.Background()
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestWorkflowCreate(t *testing.T) {

func TestWorkflowInvocation(t *testing.T) {
ctx := context.Background()
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestWorkflowInvocation(t *testing.T) {
for ti := range tick.C {
invoc, err := wi.Get(ctx, &apiserver.WorkflowInvocationIdentifier{Id: wiId})
assert.NoError(t, err)
if invoc.Status.Status.Finished() || ti.After(deadline) {
if invoc.Status.Finished() || ti.After(deadline) {
invocation = invoc
tick.Stop()
break
Expand All @@ -154,12 +154,12 @@ func TestWorkflowInvocation(t *testing.T) {

assert.Equal(t, wiSpec, invocation.Spec)
assert.Equal(t, etv, invocation.Status.Output)
assert.True(t, invocation.Status.Status.Successful())
assert.True(t, invocation.Status.Successful())
}

func TestDynamicWorkflowInvocation(t *testing.T) {
ctx := context.Background()
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
Expand Down
12 changes: 8 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import:
- package: gopkg.in/sourcemap.v1
version: v2.0.0
- package: gopkg.in/yaml.v2
# Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces.
# Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces.
- package: github.com/go-openapi/errors
version: 49fe8b3a0e0d32a617d8d50c67f856ad6e45b28b
- package: github.com/go-openapi/analysis
Expand All @@ -49,6 +49,9 @@ import:
version: 315567415dfd74b651f7a62cabfc82a57ed7b9ad
- package: github.com/go-openapi/validate
version: 2c997e169d705fe1e4235c9c1d62a7e704a84290
- package: golang.org/x/sync
subpackages:
- syncmap
testImport:
- package: github.com/stretchr/testify
version: ^1.1.4
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/function/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package function

import (
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
Expand All @@ -12,11 +13,11 @@ import (

// Api that servers mainly as a function.Runtime wrapper that deals with the higher-level logic workflow-related logic.
type Api struct {
runtime map[string]Runtime // TODO support AsyncRuntime
runtime map[string]fnenv.Runtime // TODO support AsyncRuntime
es fes.EventStore
}

func NewApi(runtime map[string]Runtime, esClient fes.EventStore) *Api {
func NewApi(runtime map[string]fnenv.Runtime, esClient fes.EventStore) *Api {
return &Api{
runtime: runtime,
es: esClient,
Expand Down
41 changes: 0 additions & 41 deletions pkg/api/function/runtime.go

This file was deleted.

9 changes: 4 additions & 5 deletions pkg/api/workflow/parse/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package parse
import (
"fmt"
"strings"

"sync"

"github.com/fission/fission-workflows/pkg/api/function"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/sirupsen/logrus"
Expand All @@ -23,10 +22,10 @@ import (
// for scheduling (overhead vs. load)
//
type Resolver struct {
clients map[string]function.Resolver
clients map[string]fnenv.Resolver
}

func NewResolver(client map[string]function.Resolver) *Resolver {
func NewResolver(client map[string]fnenv.Resolver) *Resolver {
return &Resolver{client}
}

Expand Down Expand Up @@ -138,7 +137,7 @@ func (ps *Resolver) resolveTask(task *types.Task) (*types.TaskTypeDef, error) {
case result := <-resolved:
return result, nil
default:
return nil, fmt.Errorf("failed to resolve function '%s' using clients '%v'", t, ps.clients)
return nil, fmt.Errorf("failed to resolve function '%s' using clients '%v': %v", t, ps.clients, lastErr)
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/api/workflow/parse/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"errors"

"github.com/fission/fission-workflows/pkg/api/function"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/stretchr/testify/assert"
Expand All @@ -17,7 +17,7 @@ func TestResolve(t *testing.T) {

fooClient := "foo"

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
fooClient: uppercaseResolver,
"failing": failingResolver,
}
Expand All @@ -44,7 +44,7 @@ func TestResolveForced(t *testing.T) {

fooClient := "foo"

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
"bar": uppercaseResolver,
fooClient: uppercaseResolver,
"failing": failingResolver,
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestResolveInputs(t *testing.T) {

fooClient := "foo"

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
fooClient: uppercaseResolver,
fooClient: uppercaseResolver,
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestResolveInputs(t *testing.T) {

func TestResolveNotFound(t *testing.T) {

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
"bar": uppercaseResolver,
"failing": failingResolver,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (gi *grpcInvocationApiServer) InvokeSync(ctx context.Context, spec *types.W
if err != nil {
logrus.Warnf("Failed to get workflow invocation from cache: %v", err)
}
if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Status.Finished() {
if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Finished() {
result = wi.WorkflowInvocation
break
}
Expand Down
Loading

0 comments on commit d3c45d4

Please sign in to comment.