Skip to content

Commit

Permalink
Add specialization handler to fission proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Sep 13, 2017
1 parent e6bdafa commit eaa177d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 10 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
vendor/
bazel-*
fission-workflow-bundle
1 change: 1 addition & 0 deletions build/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fission-workflow-bundle
12 changes: 9 additions & 3 deletions cmd/fission-workflow-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func Run(ctx context.Context, opts *Options) error {
if opts.Fission != nil {
proxySrv := http.Server{Addr: FISSION_PROXY_ADDRESS}
defer proxySrv.Shutdown(ctx)
runFissionEnvironmentProxy(proxySrv, es, wfiCache())
runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers)
}

if opts.ApiAdmin != nil {
Expand Down Expand Up @@ -271,11 +271,17 @@ func runHttpGateway(ctx context.Context, gwSrv http.Server) {
log.Info("Serving HTTP API gateway at: ", gwSrv.Addr)
}

func runFissionEnvironmentProxy(proxySrv http.Server, es fes.EventStore, wfiCache fes.CacheReader) {
func runFissionEnvironmentProxy(proxySrv http.Server, es fes.EventStore, wfiCache fes.CacheReader,
wfCache fes.CacheReader, resolvers map[string]function.Resolver) {

workflowParser := parse.NewResolver(resolvers)
workflowValidator := parse.NewValidator()
workflowApi := workflow.NewApi(es, workflowParser)
wfServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, workflowValidator, wfCache)
wfiApi := invocation.NewApi(es, wfiCache)
wfiServer := apiserver.NewGrpcInvocationApiServer(wfiApi)
proxyMux := http.NewServeMux()
fissionProxyServer := fission.NewFissionProxyServer(wfiServer)
fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer)
fissionProxyServer.RegisterServer(proxyMux)

proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)
Expand Down
8 changes: 4 additions & 4 deletions cmd/fission-workflow-bundle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ func createCli() *cli.App {
EnvVar: "FNENV_FISSION_POOLMGR",
},
cli.StringFlag{
Name: "fission-controller",
Usage: "Address of the controller for resolving functions",
Value: "http://controller.fission",
Name: "fission-controller",
Usage: "Address of the controller for resolving functions",
Value: "http://controller.fission",
EnvVar: "FNENV_FISSION_CONTROLLER",
},

// Components
cli.BoolFlag{
Name: "internal",
Name: "internal",
Usage: "Use internal function runtime",
},
cli.BoolFlag{
Expand Down
56 changes: 54 additions & 2 deletions pkg/fnenv/fission/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@ import (

"io/ioutil"

"os"

"github.com/fission/fission-workflow/pkg/apiserver"
"github.com/fission/fission-workflow/pkg/types"
"github.com/gogo/protobuf/jsonpb"
"github.com/sirupsen/logrus"
)

const userFunc = "/userfunc/user"

// Proxy between Fission and Workflow to ensure that workflowInvocations comply with Fission function interface
type Proxy struct {
invocationServer apiserver.WorkflowInvocationAPIServer
workflowServer apiserver.WorkflowAPIServer
}

func NewFissionProxyServer(srv apiserver.WorkflowInvocationAPIServer) *Proxy {
return &Proxy{srv}
func NewFissionProxyServer(wfiSrv apiserver.WorkflowInvocationAPIServer, wfSrv apiserver.WorkflowAPIServer) *Proxy {
return &Proxy{wfiSrv, wfSrv}
}

func (fp *Proxy) RegisterServer(mux *http.ServeMux) {
mux.HandleFunc("/", fp.handleRequest)
mux.HandleFunc("/specialize", fp.handleSpecialize)
}

func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -64,3 +72,47 @@ func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write(resp)
}

func (fp *Proxy) handleSpecialize(w http.ResponseWriter, r *http.Request) {
_, err := os.Stat(userFunc)
if err != nil {
if os.IsNotExist(err) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(userFunc + ": not found"))
return
} else {
panic(err)
}
}

rdr, err := os.Open(userFunc)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Failed to read executable."))
return
}

wfSpec := &types.WorkflowSpec{}
err = jsonpb.Unmarshal(rdr, wfSpec)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Failed to parse workflow."))
return
}

ctx := context.Background()
wfId, err := fp.workflowServer.Create(ctx, wfSpec)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Failed to store workflow."))
return
}

err = os.Remove(userFunc)
if err != nil {
logrus.Warnf("Failed to remove userFunc: %v", err)
}

w.WriteHeader(http.StatusOK)
w.Write([]byte(wfId.Id))
}

0 comments on commit eaa177d

Please sign in to comment.