Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus integration #122

Merged
merged 4 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ data/

*.tgz
/fission-workflows-bundle
/fission-workflows-bundle-osx
/fission-workflows-bundle-windows
/wfcli
/wfcli-osx
/wfcli-windows
__pycache__/
*.pyc

Expand Down
3 changes: 2 additions & 1 deletion Docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
- [Functions](./functions.md)
- [Data](data.md)
- [Roadmap](./roadmap.md)
- [Deployment Administration](./admin.md)
- [Deployment Administration](./admin.md)
- [Instrumentation and Logging](./instrumentation.md)
56 changes: 56 additions & 0 deletions Docs/instrumentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Instrumentation

Fission Workflows provides (or aims to provide) several forms of instrumentation to help you gain insight into
the engine.

It contains the following main features:
- It uses Prometheus to expose various, both low-level and high-level, metrics.
- Logging is provided using the Golang library logrus for structured logging.
- (future) A predefined Grafana dashboard to provide visualizations of the various Prometheus metrics.

## Prometheus

When enabled---using the `--metrics` flag---the workflow engine collects exposes various metrics over HTTP.
The default path to the exposes metrics is `${WORKFLOWS_IP}:8080/metrics`.

These metrics are scraped at a regular interval by a Prometheus instance, which is not included in Fission.
To setup Prometheus quickly:
```bash
helm install --namespace monitoring --name prometheus stable/prometheus
```

To enable prometheus to discover Fission Workflows, specific annotations need to be present on the pod.
For now you need to manually add annotations to the Fission Workflows deployment in the `fission-function` namespace.
```bash
NS=fission-function
kubectl -n ${NS} edit $(kubectl get po -n ${NS} -o name | grep workflow)
```

And add the following to the metadata.
```bash
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
```

(In the near future, this step will be done by Fission automatically.)

Now, to access metrics in the Prometheus dashboard you just need to exposes the prometheus-server in the `monitoring`
namespace or access the cluserIP from within the cluster.

### Prometheus NATS exporter
Given that NATS streaming plays an important role in the workflow system, it is also useful to collect the metrics of
NATS into prometheus. Although not directly implemented in the NATS deployments, there is the
[Prometheus NATS exporter](https://github.com/nats-io/prometheus-nats-exporter) as a separate module to install.

## Grafana
A common way to visualize the metrics collected with Prometheus is to use Grafana to create and share graphs and
other types of visualizations.

```bash
helm install --namespace monitoring --name prometheus stable/grafana
```

In the future, we will provide a pre-built Grafana dashboard with useful graphs to provide you insight into the
system, without needing to build dashboards yourself.
24 changes: 18 additions & 6 deletions Docs/wip/instrumentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@
This document contains description and high-level documentation on the instrumentation of the system.
The instrumentation here is considered to encompass tracing, logging and metrics.

## Logging

TODO

Common fields:
Terminology:
- ctrl: name of the controller if applicable
- wf: id of the workflow if applicable
- wfi: id of the workflow invocation if applicable
- component: name of component (controller, api, apiserver, fnenv)
- component: name of component (controller, api, apiserver, fnenv)

## Logging

## Metrics

For metrics the Prometheus time series and monitoring system is used.
The following metrics are collected and available under `:8080/metrics` in the Prometheus data format.

Metrics:
- Active invocations (Gauge)
- Cache size (Gauge)

- Completed invocations (Counter)
- Failed invocations (Counter)
- Successful invocations (Counter)
- Aborted invocations (Counter)
4 changes: 2 additions & 2 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ helm repo update

# Install Fission
# This assumes that you do not have a Fission deployment yet, and are installing on a standard Minikube deployment.
# Otherwise see http://fission.io/docs/0.4.0/install/ for more detailed instructions
helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.4.1
# Otherwise see http://fission.io/docs/0.7.2/install/ for more detailed instructions
helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.7.2

# Install Fission Workflows
helm install --wait -n fission-workflows fission-charts/fission-workflows --version 0.3.0
Expand Down
4 changes: 3 additions & 1 deletion build/runtime-env/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ FROM scratch
COPY --from=workflows-bundle /fission-workflows-bundle /fission-workflows-bundle

EXPOSE 8888
EXPOSE 8080

ENV FNENV_FISSION_CONTROLLER http://controller.fission
ENV FNENV_FISSION_EXECUTOR http://executor.fission
Expand All @@ -25,4 +26,5 @@ ENTRYPOINT ["/fission-workflows-bundle", \
"--api-http", \
"--api-workflow-invocation", \
"--api-workflow", \
"--api-admin"]
"--api-admin", \
"--metrics"]
3 changes: 2 additions & 1 deletion charts/fission-workflows/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ spec:
"--api-workflow-invocation",
"--api-workflow",
"--api-admin",
"--metrics",
]
env:
- name: ES_NATS_URL
Expand Down Expand Up @@ -90,4 +91,4 @@ spec:
builder:
image: "{{ .Values.buildEnvImage }}:{{.Values.tag}}"
command: "defaultBuild"
allowedFunctionsPerContainer: infinite
allowedFunctionsPerContainer: infinite
121 changes: 82 additions & 39 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
wfictr "github.com/fission/fission-workflows/pkg/controller/invocation"
wfctr "github.com/fission/fission-workflows/pkg/controller/workflow"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/backend/mem"
"github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
Expand All @@ -28,7 +29,9 @@ import (
controllerc "github.com/fission/fission/controller/client"
executor "github.com/fission/fission/executor/client"
"github.com/gorilla/handlers"
"github.com/grpc-ecosystem/go-grpc-prometheus"
grpcruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand All @@ -49,6 +52,7 @@ type Options struct {
WorkflowAPI bool
HTTPGateway bool
InvocationAPI bool
Metrics bool
}

type FissionOptions struct {
Expand All @@ -64,7 +68,10 @@ func Run(ctx context.Context, opts *Options) error {
var es fes.Backend
var esPub pubsub.Publisher

grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
)
defer grpcServer.GracefulStop()

// Event Stores
Expand All @@ -77,6 +84,11 @@ func Run(ctx context.Context, opts *Options) error {
natsEs := setupNatsEventStoreClient(opts.Nats.URL, opts.Nats.Cluster, opts.Nats.Client)
es = natsEs
esPub = natsEs
} else {
log.Warn("No event store provided; using the development, in-memory event store")
backend := mem.NewBackend()
es = backend
esPub = backend
}

// Caches
Expand All @@ -88,9 +100,13 @@ func Run(ctx context.Context, opts *Options) error {
resolvers := map[string]fnenv.RuntimeResolver{}
runtimes := map[string]fnenv.Runtime{}

log.Infof("Using Task Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationAPI, wfiCache())
runtimes[workflows.Name] = reflectiveRuntime
if opts.InternalRuntime || opts.Fission != nil {
log.Infof("Using Task Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationAPI, wfiCache())
runtimes[workflows.Name] = reflectiveRuntime
} else {
log.Info("No function runtimes specified.")
}
if opts.InternalRuntime {
log.Infof("Using Task Runtime: Internal")
runtimes["internal"] = setupInternalFunctionRuntime()
Expand Down Expand Up @@ -125,9 +141,18 @@ func Run(ctx context.Context, opts *Options) error {

// Http servers
if opts.Fission != nil {
proxyMux := http.NewServeMux()
runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers)
proxySrv := &http.Server{Addr: fissionProxyAddress}
proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)

if opts.Metrics {
setupMetricsEndpoint(proxyMux)
}

go proxySrv.ListenAndServe()
defer proxySrv.Shutdown(ctx)
runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers)
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
}

if opts.AdminAPI {
Expand All @@ -143,6 +168,9 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI {
log.Info("Instrumenting gRPC server with Prometheus metrics")
grpc_prometheus.Register(grpcServer)

lis, err := net.Listen("tcp", gRPCAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
Expand All @@ -152,20 +180,41 @@ func Run(ctx context.Context, opts *Options) error {
go grpcServer.Serve(lis)
}

if opts.HTTPGateway {
HTTPGatewaySrv := &http.Server{Addr: apiGatewayAddress}
defer HTTPGatewaySrv.Shutdown(ctx)
var admin, wf, wfi string
if opts.AdminAPI {
admin = gRPCAddress
}
if opts.WorkflowAPI {
wf = gRPCAddress
if opts.HTTPGateway || opts.Metrics {
grpcMux := grpcruntime.NewServeMux()
httpMux := http.NewServeMux()

if opts.HTTPGateway {

var admin, wf, wfi string
if opts.AdminAPI {
admin = gRPCAddress
}
if opts.WorkflowAPI {
wf = gRPCAddress
}
if opts.InvocationAPI {
wfi = gRPCAddress
}
serveHTTPGateway(ctx, grpcMux, admin, wf, wfi)
}
if opts.InvocationAPI {
wfi = gRPCAddress

// Metrics
if opts.Metrics {
setupMetricsEndpoint(httpMux)
log.Infof("Set up prometheus collector: %v/metrics", apiGatewayAddress)
}
serveHTTPGateway(ctx, HTTPGatewaySrv, admin, wf, wfi)

apiSrv := &http.Server{Addr: apiGatewayAddress}
httpMux.Handle("/", grpcMux)
apiSrv.Handler = httpMux
go func() {
err := apiSrv.ListenAndServe()
log.WithField("err", err).Info("HTTP Gateway exited")
}()
defer apiSrv.Shutdown(ctx)

log.Info("Serving HTTP API gateway at: ", apiSrv.Addr)
}

log.Info("Bundle set up.")
Expand Down Expand Up @@ -249,7 +298,7 @@ func setupWorkflowInvocationCache(ctx context.Context, invocationEventPub pubsub
return aggregates.NewWorkflowInvocation("")
}

return fes.NewSubscribedCache(ctx, fes.NewMapCache(), wi, invokeSub)
return fes.NewSubscribedCache(ctx, fes.NewNamedMapCache("invocation"), wi, invokeSub)
}

func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher) *fes.SubscribedCache {
Expand All @@ -260,7 +309,7 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher)
wb := func() fes.Aggregator {
return aggregates.NewWorkflow("")
}
return fes.NewSubscribedCache(ctx, fes.NewMapCache(), wb, wfSub)
return fes.NewSubscribedCache(ctx, fes.NewNamedMapCache("workflow"), wb, wfSub)
}

func serveAdminAPI(s *grpc.Server) {
Expand All @@ -285,54 +334,44 @@ func serveInvocationAPI(s *grpc.Server, es fes.Backend, wfiCache fes.CacheReader
log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress)
}

func serveHTTPGateway(ctx context.Context, gwSrv *http.Server, adminAPIAddr string, workflowAPIAddr string, invocationAPIAddr string) {
mux := grpcruntime.NewServeMux()
grpcOpts := []grpc.DialOption{grpc.WithInsecure()}
func serveHTTPGateway(ctx context.Context, mux *grpcruntime.ServeMux, adminAPIAddr string, workflowAPIAddr string,
invocationAPIAddr string) {
opts := []grpc.DialOption{grpc.WithInsecure()}
if adminAPIAddr != "" {
err := apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, adminAPIAddr, grpcOpts)
err := apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, adminAPIAddr, opts)
if err != nil {
panic(err)
}
log.Info("Registered Workflow API HTTP Endpoint")
}

if workflowAPIAddr != "" {
err := apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, workflowAPIAddr, grpcOpts)
err := apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, workflowAPIAddr, opts)
if err != nil {
panic(err)
}
log.Info("Registered Admin API HTTP Endpoint")
}

if invocationAPIAddr != "" {
err := apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, invocationAPIAddr, grpcOpts)
err := apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, invocationAPIAddr, opts)
if err != nil {
panic(err)
}
log.Info("Registered Workflow Invocation API HTTP Endpoint")
}

gwSrv.Handler = mux
go func() {
err := gwSrv.ListenAndServe()
log.WithField("err", err).Info("HTTP Gateway exited")
}()

log.Info("Serving HTTP API gateway at: ", gwSrv.Addr)
}

func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache fes.CacheReader,
func runFissionEnvironmentProxy(proxyMux *http.ServeMux, es fes.Backend, wfiCache fes.CacheReader,
wfCache fes.CacheReader, resolvers map[string]fnenv.RuntimeResolver) {

workflowParser := fnenv.NewMetaResolver(resolvers)
workflowAPI := api.NewWorkflowAPI(es, workflowParser)
wfServer := apiserver.NewWorkflow(workflowAPI, wfCache)
wfiAPI := api.NewInvocationAPI(es)
wfiServer := apiserver.NewInvocation(wfiAPI, wfiCache)
proxyMux := http.NewServeMux()
fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer)
fissionProxyServer.RegisterServer(proxyMux)

proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)
go proxySrv.ListenAndServe()
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
}

func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.Backend,
Expand All @@ -356,3 +395,7 @@ func runController(ctx context.Context, ctrls ...controller.Controller) {
ctrl := controller.NewMetaController(ctrls...)
go ctrl.Run(ctx)
}

func setupMetricsEndpoint(apiMux *http.ServeMux) {
apiMux.Handle("/metrics", promhttp.Handler())
}
Loading