Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fnenv.Notifier interface + restructuring of fnenv package #106

Merged
merged 1 commit into from
Feb 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ install:
before_script:
# Build
- cd ${TRAVIS_BUILD_DIR}
- hack/verify-govet.sh
- hack/verify-gofmt.sh
- glide install
- build/build-linux.sh

Expand Down
37 changes: 19 additions & 18 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/fission/fission-workflows/pkg/controller/expr"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/eventstore/nats"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
"github.com/fission/fission-workflows/pkg/fnenv/native"
"github.com/fission/fission-workflows/pkg/fnenv/native/builtin"
Expand All @@ -38,9 +39,9 @@ import (
)

const (
GRPC_ADDRESS = ":5555"
API_GATEWAY_ADDRESS = ":8080"
FISSION_PROXY_ADDRESS = ":8888"
gRPCAddress = ":5555"
apiGatewayAddress = ":8080"
fissionProxyAddress = ":8888"
)

type Options struct {
Expand Down Expand Up @@ -96,8 +97,8 @@ func Run(ctx context.Context, opts *Options) error {
wfCache := getWorkflowCache(ctx, esPub)

// Resolvers and runtimes
resolvers := map[string]function.Resolver{}
runtimes := map[string]function.Runtime{}
resolvers := map[string]fnenv.Resolver{}
runtimes := map[string]fnenv.Runtime{}
if opts.InternalRuntime {
log.WithField("config", nil).Infof("Using Function Runtime: Internal")
runtimes["internal"] = setupInternalFunctionRuntime()
Expand Down Expand Up @@ -131,7 +132,7 @@ func Run(ctx context.Context, opts *Options) error {

// Http servers
if opts.Fission != nil {
proxySrv := &http.Server{Addr: FISSION_PROXY_ADDRESS}
proxySrv := &http.Server{Addr: fissionProxyAddress}
defer proxySrv.Shutdown(ctx)
runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers)
}
Expand All @@ -149,7 +150,7 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.ApiAdmin || opts.ApiWorkflow || opts.ApiWorkflowInvocation {
lis, err := net.Listen("tcp", GRPC_ADDRESS)
lis, err := net.Listen("tcp", gRPCAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand All @@ -159,17 +160,17 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.ApiHttp {
apiSrv := &http.Server{Addr: API_GATEWAY_ADDRESS}
apiSrv := &http.Server{Addr: apiGatewayAddress}
defer apiSrv.Shutdown(ctx)
var admin, wf, wfi string
if opts.ApiAdmin {
admin = GRPC_ADDRESS
admin = gRPCAddress
}
if opts.ApiWorkflow {
wf = GRPC_ADDRESS
wf = gRPCAddress
}
if opts.ApiWorkflowInvocation {
wfi = GRPC_ADDRESS
wfi = gRPCAddress
}
runHttpGateway(ctx, apiSrv, admin, wf, wfi)
}
Expand Down Expand Up @@ -273,23 +274,23 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher)
func runAdminApiServer(s *grpc.Server) {
adminServer := &apiserver.GrpcAdminApiServer{}
apiserver.RegisterAdminAPIServer(s, adminServer)
log.Infof("Serving admin gRPC API at %s.", GRPC_ADDRESS)
log.Infof("Serving admin gRPC API at %s.", gRPCAddress)
}

func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[string]function.Resolver, wfCache fes.CacheReader) {
func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[string]fnenv.Resolver, wfCache fes.CacheReader) {
workflowParser := parse.NewResolver(resolvers)
workflowValidator := parse.NewValidator()
workflowApi := workflow.NewApi(es, workflowParser)
workflowServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, workflowValidator, wfCache)
apiserver.RegisterWorkflowAPIServer(s, workflowServer)
log.Infof("Serving workflow gRPC API at %s.", GRPC_ADDRESS)
log.Infof("Serving workflow gRPC API at %s.", gRPCAddress)
}

func runWorkflowInvocationApiServer(s *grpc.Server, es fes.EventStore, wfiCache fes.CacheReader) {
invocationApi := invocation.NewApi(es)
invocationServer := apiserver.NewGrpcInvocationApiServer(invocationApi, wfiCache)
apiserver.RegisterWorkflowInvocationAPIServer(s, invocationServer)
log.Infof("Serving workflow invocation gRPC API at %s.", GRPC_ADDRESS)
log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress)
}

func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string, wfApiAddr string, wfiApiAddr string) {
Expand Down Expand Up @@ -326,7 +327,7 @@ func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string
}

func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.EventStore, wfiCache fes.CacheReader,
wfCache fes.CacheReader, resolvers map[string]function.Resolver) {
wfCache fes.CacheReader, resolvers map[string]fnenv.Resolver) {

workflowParser := parse.NewResolver(resolvers)
workflowValidator := parse.NewValidator()
Expand All @@ -344,15 +345,15 @@ func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.EventStore, wfiCac
}

func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore,
fnRuntimes map[string]function.Runtime) *controller.InvocationController {
fnRuntimes map[string]fnenv.Runtime) *controller.InvocationController {
functionApi := function.NewApi(fnRuntimes, es)
invocationApi := invocation.NewApi(es)
s := &scheduler.WorkflowScheduler{}
ep := expr.NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter)
return controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
}

func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]function.Resolver) *controller.WorkflowController {
func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]fnenv.Resolver) *controller.WorkflowController {
workflowApi := workflow.NewApi(es, parse.NewResolver(fnResolvers))
return controller.NewWorkflowController(wfCache, workflowApi)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/fission-workflows-bundle/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMain(m *testing.M) {
// Tests the submission of a workflow
func TestWorkflowCreate(t *testing.T) {
ctx := context.Background()
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestWorkflowCreate(t *testing.T) {

func TestWorkflowInvocation(t *testing.T) {
ctx := context.Background()
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestWorkflowInvocation(t *testing.T) {
for ti := range tick.C {
invoc, err := wi.Get(ctx, &apiserver.WorkflowInvocationIdentifier{Id: wiId})
assert.NoError(t, err)
if invoc.Status.Status.Finished() || ti.After(deadline) {
if invoc.Status.Finished() || ti.After(deadline) {
invocation = invoc
tick.Stop()
break
Expand All @@ -154,12 +154,12 @@ func TestWorkflowInvocation(t *testing.T) {

assert.Equal(t, wiSpec, invocation.Spec)
assert.Equal(t, etv, invocation.Status.Output)
assert.True(t, invocation.Status.Status.Successful())
assert.True(t, invocation.Status.Successful())
}

func TestDynamicWorkflowInvocation(t *testing.T) {
ctx := context.Background()
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
Expand Down
12 changes: 8 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import:
- package: gopkg.in/sourcemap.v1
version: v2.0.0
- package: gopkg.in/yaml.v2
# Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces.
# Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces.
- package: github.com/go-openapi/errors
version: 49fe8b3a0e0d32a617d8d50c67f856ad6e45b28b
- package: github.com/go-openapi/analysis
Expand All @@ -49,6 +49,9 @@ import:
version: 315567415dfd74b651f7a62cabfc82a57ed7b9ad
- package: github.com/go-openapi/validate
version: 2c997e169d705fe1e4235c9c1d62a7e704a84290
- package: golang.org/x/sync
subpackages:
- syncmap
testImport:
- package: github.com/stretchr/testify
version: ^1.1.4
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/function/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package function

import (
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
Expand All @@ -12,11 +13,11 @@ import (

// Api that servers mainly as a function.Runtime wrapper that deals with the higher-level logic workflow-related logic.
type Api struct {
runtime map[string]Runtime // TODO support AsyncRuntime
runtime map[string]fnenv.Runtime // TODO support AsyncRuntime
es fes.EventStore
}

func NewApi(runtime map[string]Runtime, esClient fes.EventStore) *Api {
func NewApi(runtime map[string]fnenv.Runtime, esClient fes.EventStore) *Api {
return &Api{
runtime: runtime,
es: esClient,
Expand Down
41 changes: 0 additions & 41 deletions pkg/api/function/runtime.go

This file was deleted.

9 changes: 4 additions & 5 deletions pkg/api/workflow/parse/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package parse
import (
"fmt"
"strings"

"sync"

"github.com/fission/fission-workflows/pkg/api/function"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/sirupsen/logrus"
Expand All @@ -23,10 +22,10 @@ import (
// for scheduling (overhead vs. load)
//
type Resolver struct {
clients map[string]function.Resolver
clients map[string]fnenv.Resolver
}

func NewResolver(client map[string]function.Resolver) *Resolver {
func NewResolver(client map[string]fnenv.Resolver) *Resolver {
return &Resolver{client}
}

Expand Down Expand Up @@ -138,7 +137,7 @@ func (ps *Resolver) resolveTask(task *types.Task) (*types.TaskTypeDef, error) {
case result := <-resolved:
return result, nil
default:
return nil, fmt.Errorf("failed to resolve function '%s' using clients '%v'", t, ps.clients)
return nil, fmt.Errorf("failed to resolve function '%s' using clients '%v': %v", t, ps.clients, lastErr)
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/api/workflow/parse/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"errors"

"github.com/fission/fission-workflows/pkg/api/function"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/stretchr/testify/assert"
Expand All @@ -17,7 +17,7 @@ func TestResolve(t *testing.T) {

fooClient := "foo"

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
fooClient: uppercaseResolver,
"failing": failingResolver,
}
Expand All @@ -44,7 +44,7 @@ func TestResolveForced(t *testing.T) {

fooClient := "foo"

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
"bar": uppercaseResolver,
fooClient: uppercaseResolver,
"failing": failingResolver,
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestResolveInputs(t *testing.T) {

fooClient := "foo"

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
fooClient: uppercaseResolver,
fooClient: uppercaseResolver,
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestResolveInputs(t *testing.T) {

func TestResolveNotFound(t *testing.T) {

clients := map[string]function.Resolver{
clients := map[string]fnenv.Resolver{
"bar": uppercaseResolver,
"failing": failingResolver,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (gi *grpcInvocationApiServer) InvokeSync(ctx context.Context, spec *types.W
if err != nil {
logrus.Warnf("Failed to get workflow invocation from cache: %v", err)
}
if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Status.Finished() {
if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Finished() {
result = wi.WorkflowInvocation
break
}
Expand Down
Loading