Skip to content

Commit

Permalink
Added initial prometheus metric collection
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed May 18, 2018
1 parent e8d83ed commit 0a1f6a9
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 49 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ data/

*.tgz
/fission-workflows-bundle
/fission-workflows-bundle-osx
/fission-workflows-bundle-windows
/wfcli
/wfcli-osx
/wfcli-windows
__pycache__/
*.pyc
24 changes: 18 additions & 6 deletions Docs/wip/instrumentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@
This document contains description and high-level documentation on the instrumentation of the system.
The instrumentation here is considered to encompass tracing, logging and metrics.

## Logging

TODO

Common fields:
Terminology:
- ctrl: name of the controller if applicable
- wf: id of the workflow if applicable
- wfi: id of the workflow invocation if applicable
- component: name of component (controller, api, apiserver, fnenv)
- component: name of component (controller, api, apiserver, fnenv)

## Logging

## Metrics

For metrics the Prometheus time series and monitoring system is used.
The following metrics are collected and available under `:8080/metrics` in the Prometheus data format.

Metrics:
- Active invocations (Gauge)
- Cache size (Gauge)

- Completed invocations (Counter)
- Failed invocations (Counter)
- Successful invocations (Counter)
- Aborted invocations (Counter)
4 changes: 2 additions & 2 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ helm repo update

# Install Fission
# This assumes that you do not have a Fission deployment yet, and are installing on a standard Minikube deployment.
# Otherwise see http://fission.io/docs/0.4.0/install/ for more detailed instructions
helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.4.1
# Otherwise see http://fission.io/docs/0.7.2/install/ for more detailed instructions
helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.7.2

# Install Fission Workflows
helm install --wait -n fission-workflows fission-charts/fission-workflows --version 0.3.0
Expand Down
3 changes: 2 additions & 1 deletion charts/fission-workflows/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ spec:
"--api-workflow-invocation",
"--api-workflow",
"--api-admin",
"--metrics",
]
env:
- name: ES_NATS_URL
Expand Down Expand Up @@ -90,4 +91,4 @@ spec:
builder:
image: "{{ .Values.buildEnvImage }}:{{.Values.tag}}"
command: "defaultBuild"
allowedFunctionsPerContainer: infinite
allowedFunctionsPerContainer: infinite
101 changes: 69 additions & 32 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
wfictr "github.com/fission/fission-workflows/pkg/controller/invocation"
wfctr "github.com/fission/fission-workflows/pkg/controller/workflow"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/backend/mem"
"github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
Expand All @@ -32,6 +33,7 @@ import (
executor "github.com/fission/fission/executor/client"
"github.com/gorilla/handlers"
grpcruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand All @@ -52,6 +54,7 @@ type Options struct {
ApiWorkflow bool
ApiHttp bool
ApiWorkflowInvocation bool
Metrics bool
}

type FissionOptions struct {
Expand Down Expand Up @@ -80,6 +83,11 @@ func Run(ctx context.Context, opts *Options) error {
natsEs := setupNatsEventStoreClient(opts.Nats.Url, opts.Nats.Cluster, opts.Nats.Client)
es = natsEs
esPub = natsEs
} else {
log.Warn("No event store provided; using the development, in-memory event store")
backend := mem.NewBackend()
es = backend
esPub = backend
}

// Caches
Expand All @@ -91,9 +99,13 @@ func Run(ctx context.Context, opts *Options) error {
resolvers := map[string]fnenv.RuntimeResolver{}
runtimes := map[string]fnenv.Runtime{}

log.Infof("Using Function Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationApi, wfiCache())
runtimes[workflows.Name] = reflectiveRuntime
if opts.InternalRuntime || opts.Fission != nil {
log.Infof("Using Function Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationApi, wfiCache())
runtimes[workflows.Name] = reflectiveRuntime
} else {
log.Info("No function runtimes specified.")
}
if opts.InternalRuntime {
log.Infof("Using Function Runtime: Internal")
runtimes["internal"] = setupInternalFunctionRuntime()
Expand Down Expand Up @@ -128,9 +140,18 @@ func Run(ctx context.Context, opts *Options) error {

// Http servers
if opts.Fission != nil {
proxyMux := http.NewServeMux()
runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers)
proxySrv := &http.Server{Addr: fissionProxyAddress}
proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)

if opts.Metrics {
setupMetricsEndpoint(proxyMux)
}

go proxySrv.ListenAndServe()
defer proxySrv.Shutdown(ctx)
runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers)
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
}

if opts.ApiAdmin {
Expand All @@ -155,20 +176,41 @@ func Run(ctx context.Context, opts *Options) error {
go grpcServer.Serve(lis)
}

if opts.ApiHttp {
apiSrv := &http.Server{Addr: apiGatewayAddress}
defer apiSrv.Shutdown(ctx)
var admin, wf, wfi string
if opts.ApiAdmin {
admin = gRPCAddress
}
if opts.ApiWorkflow {
wf = gRPCAddress
if opts.ApiHttp || opts.Metrics {
grpcMux := grpcruntime.NewServeMux()
httpMux := http.NewServeMux()

if opts.ApiHttp {

var admin, wf, wfi string
if opts.ApiAdmin {
admin = gRPCAddress
}
if opts.ApiWorkflow {
wf = gRPCAddress
}
if opts.ApiWorkflowInvocation {
wfi = gRPCAddress
}
runHttpGateway(ctx, grpcMux, admin, wf, wfi)
}
if opts.ApiWorkflowInvocation {
wfi = gRPCAddress

// Metrics
if opts.Metrics {
setupMetricsEndpoint(httpMux)
log.Infof("Set up prometheus collector: %v/metrics", apiGatewayAddress)
}
runHttpGateway(ctx, apiSrv, admin, wf, wfi)

apiSrv := &http.Server{Addr: apiGatewayAddress}
httpMux.Handle("/", grpcMux)
apiSrv.Handler = httpMux
go func() {
err := apiSrv.ListenAndServe()
log.WithField("err", err).Info("HTTP Gateway exited")
}()
defer apiSrv.Shutdown(ctx)

log.Info("Serving HTTP API gateway at: ", apiSrv.Addr)
}

log.Info("Bundle set up.")
Expand Down Expand Up @@ -288,54 +330,45 @@ func runWorkflowInvocationApiServer(s *grpc.Server, es fes.Backend, wfiCache fes
log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress)
}

func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string, wfApiAddr string, wfiApiAddr string) {
mux := grpcruntime.NewServeMux()
func runHttpGateway(ctx context.Context, gwSrv *grpcruntime.ServeMux, adminApiAddr string, wfApiAddr string,
wfiApiAddr string) {
mux := gwSrv
grpcOpts := []grpc.DialOption{grpc.WithInsecure()}
if adminApiAddr != "" {
err := apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, adminApiAddr, grpcOpts)
if err != nil {
panic(err)
}
log.Info("Registered Workflow API HTTP Endpoint")
}

if wfApiAddr != "" {
err := apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, wfApiAddr, grpcOpts)
if err != nil {
panic(err)
}
log.Info("Registered Admin API HTTP Endpoint")
}

if wfiApiAddr != "" {
err := apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, wfiApiAddr, grpcOpts)
if err != nil {
panic(err)
}
log.Info("Registered Workflow Invocation API HTTP Endpoint")
}

gwSrv.Handler = mux
go func() {
err := gwSrv.ListenAndServe()
log.WithField("err", err).Info("HTTP Gateway exited")
}()

log.Info("Serving HTTP API gateway at: ", gwSrv.Addr)
}

func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache fes.CacheReader,
func runFissionEnvironmentProxy(proxyMux *http.ServeMux, es fes.Backend, wfiCache fes.CacheReader,
wfCache fes.CacheReader, resolvers map[string]fnenv.RuntimeResolver) {

workflowParser := fnenv.NewMetaResolver(resolvers)
workflowApi := workflow.NewApi(es, workflowParser)
wfServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, wfCache)
wfiApi := invocation.NewApi(es)
wfiServer := apiserver.NewGrpcInvocationApiServer(wfiApi, wfiCache)
proxyMux := http.NewServeMux()
fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer)
fissionProxyServer.RegisterServer(proxyMux)

proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)
go proxySrv.ListenAndServe()
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
}

func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.Backend,
Expand All @@ -359,3 +392,7 @@ func runController(ctx context.Context, ctrls ...controller.Controller) {
ctrl := controller.NewMetaController(ctrls...)
go ctrl.Run(ctx)
}

func setupMetricsEndpoint(apiMux *http.ServeMux) {
apiMux.Handle("/metrics", promhttp.Handler())
}
5 changes: 5 additions & 0 deletions cmd/fission-workflows-bundle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func main() {
ApiWorkflow: c.Bool("api") || c.Bool("api-workflow"),
ApiWorkflowInvocation: c.Bool("api") || c.Bool("api-workflow-invocation"),
ApiHttp: c.Bool("api") || c.Bool("api-http"),
Metrics: c.Bool("metrics") || c.Bool("metrics"),
})
}
cliApp.Run(os.Args)
Expand Down Expand Up @@ -163,6 +164,10 @@ func createCli() *cli.App {
Name: "api-admin",
Usage: "Serve the admin gRPC api",
},
cli.BoolFlag{
Name: "metrics",
Usage: "Serve prometheus metrics",
},
}

return cliApp
Expand Down
32 changes: 30 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ import:
version: v1.0
- package: github.com/pkg/errors
version: ^0.8.0
- package: github.com/prometheus/client_golang
version: 0.8.0
testImport:
- package: github.com/stretchr/testify
version: 1.1.4
Expand Down
Loading

0 comments on commit 0a1f6a9

Please sign in to comment.