From 38986a19017b945efcbfd346ac9aa9bca90a8a13 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 18 May 2018 22:35:59 +0200 Subject: [PATCH] Added initial prometheus metric collection --- .gitignore | 4 + Docs/wip/instrumentation.md | 24 +++-- INSTALL.md | 4 +- .../templates/deployment.yaml | 5 +- cmd/fission-workflows-bundle/bundle/bundle.go | 101 ++++++++++++------ cmd/fission-workflows-bundle/main.go | 5 + glide.lock | 32 +++++- glide.yaml | 2 + pkg/fes/backend/nats/client.go | 37 ++++++- pkg/fes/backend/nats/nats.go | 11 +- pkg/fes/caches.go | 17 ++- pkg/fnenv/native/native.go | 44 +++++++- 12 files changed, 236 insertions(+), 50 deletions(-) diff --git a/.gitignore b/.gitignore index b37427ad..988802a3 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,10 @@ data/ *.tgz /fission-workflows-bundle +/fission-workflows-bundle-osx +/fission-workflows-bundle-windows /wfcli +/wfcli-osx +/wfcli-windows __pycache__/ *.pyc \ No newline at end of file diff --git a/Docs/wip/instrumentation.md b/Docs/wip/instrumentation.md index b2c2f95e..16b60a66 100644 --- a/Docs/wip/instrumentation.md +++ b/Docs/wip/instrumentation.md @@ -3,12 +3,24 @@ This document contains description and high-level documentation on the instrumentation of the system. The instrumentation here is considered to encompass tracing, logging and metrics. -## Logging - -TODO - -Common fields: +Terminology: - ctrl: name of the controller if applicable - wf: id of the workflow if applicable - wfi: id of the workflow invocation if applicable -- component: name of component (controller, api, apiserver, fnenv) \ No newline at end of file +- component: name of component (controller, api, apiserver, fnenv) + +## Logging + +## Metrics + +For metrics the Prometheus time series and monitoring system is used. +The following metrics are collected and available under `:8080/metrics` in the Prometheus data format. + +Metrics: +- Active invocations (Gauge) +- Cache size (Gauge) + +- Completed invocations (Counter) + - Failed invocations (Counter) + - Successful invocations (Counter) + - Aborted invocations (Counter) diff --git a/INSTALL.md b/INSTALL.md index 06497c1f..9ff4e526 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -38,8 +38,8 @@ helm repo update # Install Fission # This assumes that you do not have a Fission deployment yet, and are installing on a standard Minikube deployment. -# Otherwise see http://fission.io/docs/0.4.0/install/ for more detailed instructions -helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.4.1 +# Otherwise see http://fission.io/docs/0.7.2/install/ for more detailed instructions +helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.7.2 # Install Fission Workflows helm install --wait -n fission-workflows fission-charts/fission-workflows --version 0.3.0 diff --git a/charts/fission-workflows/templates/deployment.yaml b/charts/fission-workflows/templates/deployment.yaml index 52c7a672..bccaa1fe 100644 --- a/charts/fission-workflows/templates/deployment.yaml +++ b/charts/fission-workflows/templates/deployment.yaml @@ -1,6 +1,6 @@ # Workflow Apiserver is an optional component that allows users to query the workflows API through the Fission apiserver. {{ if .Values.apiserver }} -apiVersion: extensions/v1beta1 +apiVersion: app/v1 kind: Deployment metadata: name: workflows-apiserver @@ -25,6 +25,7 @@ spec: "--api-workflow-invocation", "--api-workflow", "--api-admin", + "--metrics", ] env: - name: ES_NATS_URL @@ -90,4 +91,4 @@ spec: builder: image: "{{ .Values.buildEnvImage }}:{{.Values.tag}}" command: "defaultBuild" - allowedFunctionsPerContainer: infinite + allowedFunctionsPerContainer: infinite \ No newline at end of file diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 2c17d5d8..9174abb1 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -16,6 +16,7 @@ import ( wfictr "github.com/fission/fission-workflows/pkg/controller/invocation" wfctr "github.com/fission/fission-workflows/pkg/controller/workflow" "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fes/backend/mem" "github.com/fission/fission-workflows/pkg/fes/backend/nats" "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/fnenv/fission" @@ -32,6 +33,7 @@ import ( executor "github.com/fission/fission/executor/client" "github.com/gorilla/handlers" grpcruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -52,6 +54,7 @@ type Options struct { ApiWorkflow bool ApiHttp bool ApiWorkflowInvocation bool + Metrics bool } type FissionOptions struct { @@ -80,6 +83,11 @@ func Run(ctx context.Context, opts *Options) error { natsEs := setupNatsEventStoreClient(opts.Nats.Url, opts.Nats.Cluster, opts.Nats.Client) es = natsEs esPub = natsEs + } else { + log.Warn("No event store provided; using the development, in-memory event store") + backend := mem.NewBackend() + es = backend + esPub = backend } // Caches @@ -91,9 +99,13 @@ func Run(ctx context.Context, opts *Options) error { resolvers := map[string]fnenv.RuntimeResolver{} runtimes := map[string]fnenv.Runtime{} - log.Infof("Using Function Runtime: Workflow") - reflectiveRuntime := workflows.NewRuntime(invocationApi, wfiCache()) - runtimes[workflows.Name] = reflectiveRuntime + if opts.InternalRuntime || opts.Fission != nil { + log.Infof("Using Function Runtime: Workflow") + reflectiveRuntime := workflows.NewRuntime(invocationApi, wfiCache()) + runtimes[workflows.Name] = reflectiveRuntime + } else { + log.Info("No function runtimes specified.") + } if opts.InternalRuntime { log.Infof("Using Function Runtime: Internal") runtimes["internal"] = setupInternalFunctionRuntime() @@ -128,9 +140,18 @@ func Run(ctx context.Context, opts *Options) error { // Http servers if opts.Fission != nil { + proxyMux := http.NewServeMux() + runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers) proxySrv := &http.Server{Addr: fissionProxyAddress} + proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux) + + if opts.Metrics { + setupMetricsEndpoint(proxyMux) + } + + go proxySrv.ListenAndServe() defer proxySrv.Shutdown(ctx) - runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers) + log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr) } if opts.ApiAdmin { @@ -155,20 +176,41 @@ func Run(ctx context.Context, opts *Options) error { go grpcServer.Serve(lis) } - if opts.ApiHttp { - apiSrv := &http.Server{Addr: apiGatewayAddress} - defer apiSrv.Shutdown(ctx) - var admin, wf, wfi string - if opts.ApiAdmin { - admin = gRPCAddress - } - if opts.ApiWorkflow { - wf = gRPCAddress + if opts.ApiHttp || opts.Metrics { + grpcMux := grpcruntime.NewServeMux() + httpMux := http.NewServeMux() + + if opts.ApiHttp { + + var admin, wf, wfi string + if opts.ApiAdmin { + admin = gRPCAddress + } + if opts.ApiWorkflow { + wf = gRPCAddress + } + if opts.ApiWorkflowInvocation { + wfi = gRPCAddress + } + runHttpGateway(ctx, grpcMux, admin, wf, wfi) } - if opts.ApiWorkflowInvocation { - wfi = gRPCAddress + + // Metrics + if opts.Metrics { + setupMetricsEndpoint(httpMux) + log.Infof("Set up prometheus collector: %v/metrics", apiGatewayAddress) } - runHttpGateway(ctx, apiSrv, admin, wf, wfi) + + apiSrv := &http.Server{Addr: apiGatewayAddress} + httpMux.Handle("/", grpcMux) + apiSrv.Handler = httpMux + go func() { + err := apiSrv.ListenAndServe() + log.WithField("err", err).Info("HTTP Gateway exited") + }() + defer apiSrv.Shutdown(ctx) + + log.Info("Serving HTTP API gateway at: ", apiSrv.Addr) } log.Info("Bundle set up.") @@ -288,14 +330,16 @@ func runWorkflowInvocationApiServer(s *grpc.Server, es fes.Backend, wfiCache fes log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress) } -func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string, wfApiAddr string, wfiApiAddr string) { - mux := grpcruntime.NewServeMux() +func runHttpGateway(ctx context.Context, gwSrv *grpcruntime.ServeMux, adminApiAddr string, wfApiAddr string, + wfiApiAddr string) { + mux := gwSrv grpcOpts := []grpc.DialOption{grpc.WithInsecure()} if adminApiAddr != "" { err := apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, adminApiAddr, grpcOpts) if err != nil { panic(err) } + log.Info("Registered Workflow API HTTP Endpoint") } if wfApiAddr != "" { @@ -303,6 +347,7 @@ func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string if err != nil { panic(err) } + log.Info("Registered Admin API HTTP Endpoint") } if wfiApiAddr != "" { @@ -310,18 +355,11 @@ func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string if err != nil { panic(err) } + log.Info("Registered Workflow Invocation API HTTP Endpoint") } - - gwSrv.Handler = mux - go func() { - err := gwSrv.ListenAndServe() - log.WithField("err", err).Info("HTTP Gateway exited") - }() - - log.Info("Serving HTTP API gateway at: ", gwSrv.Addr) } -func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache fes.CacheReader, +func runFissionEnvironmentProxy(proxyMux *http.ServeMux, es fes.Backend, wfiCache fes.CacheReader, wfCache fes.CacheReader, resolvers map[string]fnenv.RuntimeResolver) { workflowParser := fnenv.NewMetaResolver(resolvers) @@ -329,13 +367,8 @@ func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache wfServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, wfCache) wfiApi := invocation.NewApi(es) wfiServer := apiserver.NewGrpcInvocationApiServer(wfiApi, wfiCache) - proxyMux := http.NewServeMux() fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer) fissionProxyServer.RegisterServer(proxyMux) - - proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux) - go proxySrv.ListenAndServe() - log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr) } func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.Backend, @@ -359,3 +392,7 @@ func runController(ctx context.Context, ctrls ...controller.Controller) { ctrl := controller.NewMetaController(ctrls...) go ctrl.Run(ctx) } + +func setupMetricsEndpoint(apiMux *http.ServeMux) { + apiMux.Handle("/metrics", promhttp.Handler()) +} diff --git a/cmd/fission-workflows-bundle/main.go b/cmd/fission-workflows-bundle/main.go index c38e3ad9..bd537c82 100644 --- a/cmd/fission-workflows-bundle/main.go +++ b/cmd/fission-workflows-bundle/main.go @@ -30,6 +30,7 @@ func main() { ApiWorkflow: c.Bool("api") || c.Bool("api-workflow"), ApiWorkflowInvocation: c.Bool("api") || c.Bool("api-workflow-invocation"), ApiHttp: c.Bool("api") || c.Bool("api-http"), + Metrics: c.Bool("metrics") || c.Bool("metrics"), }) } cliApp.Run(os.Args) @@ -163,6 +164,10 @@ func createCli() *cli.App { Name: "api-admin", Usage: "Serve the admin gRPC api", }, + cli.BoolFlag{ + Name: "metrics", + Usage: "Serve prometheus metrics", + }, } return cliApp diff --git a/glide.lock b/glide.lock index 9a475d55..97f86a14 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 0362a0749f44ec163b038bd647152e07ac72d504d18710cfe59c48269625bad1 -updated: 2018-05-17T12:12:42.008628+02:00 +hash: 14ab2375263e351e6e6ee2e7047ae89ad3d0f5e65a929eb81f677856510fdf05 +updated: 2018-05-18T22:37:04.909133+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -13,6 +13,10 @@ imports: - autorest/adal - autorest/azure - autorest/date +- name: github.com/beorn7/perks + version: 3a771d992973f24aa725d07868b467d1ddfceafb + subpackages: + - quantile - name: github.com/davecgh/go-spew version: 346938d642f2ec3594ed81d874461961cd0faa76 subpackages: @@ -120,6 +124,11 @@ imports: - buffer - jlexer - jwriter +- name: github.com/matttproud/golang_protobuf_extensions + version: c12348ce28de40eed0136aa2b644d0ee0650e56c + repo: https://github.com/matttproud/golang_protobuf_extensions + subpackages: + - pbutil - name: github.com/nats-io/go-nats version: d66cb54e6b7bdd93f0b28afc8450d84c780dfb68 subpackages: @@ -138,6 +147,25 @@ imports: version: 289cccf02c178dc782430d534e3c1f5b72af807f - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d +- name: github.com/prometheus/client_golang + version: c5b7fccd204277076155f10851dad72b76a49317 + subpackages: + - prometheus + - prometheus/promhttp +- name: github.com/prometheus/client_model + version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c + subpackages: + - go +- name: github.com/prometheus/common + version: 61f87aac8082fa8c3c5655c7608d7478d46ac2ad + subpackages: + - expfmt + - internal/bitbucket.org/ww/goautoneg + - model +- name: github.com/prometheus/procfs + version: e645f4e5aaa8506fc71d6edbc5c4ff02c04c46f2 + subpackages: + - xfs - name: github.com/PuerkitoBio/purell version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc diff --git a/glide.yaml b/glide.yaml index bda40981..5b71d10f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -65,6 +65,8 @@ import: version: v1.0 - package: github.com/pkg/errors version: ^0.8.0 +- package: github.com/prometheus/client_golang + version: 0.8.0 testImport: - package: github.com/stretchr/testify version: 1.1.4 diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index 6573fb00..3eeab54b 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -9,6 +9,7 @@ import ( "github.com/golang/protobuf/proto" nats "github.com/nats-io/go-nats" "github.com/nats-io/go-nats-streaming" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -16,6 +17,27 @@ const ( defaultClient = "fes" ) +var ( + subsActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "fes", + Subsystem: "nats", + Name: "subs_active", + Help: "Number of active subscriptions to NATS subjects.", + }) + + eventsAppended = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "fes", + Subsystem: "nats", + Name: "events_appended_total", + Help: "Count of appended events (excluding any internal events).", + }) +) + +func init() { + prometheus.MustRegister(subsActive) + prometheus.MustRegister(eventsAppended) +} + type EventStore struct { pubsub.Publisher conn *WildcardConn @@ -92,11 +114,17 @@ func (es *EventStore) Watch(aggregate fes.Aggregate) error { logrus.Infof("Backend client watches:' %s'", subject) es.sub[aggregate] = sub + subsActive.Inc() return nil } func (es *EventStore) Close() error { - return es.conn.Close() + err := es.conn.Close() + if err != nil { + return err + } + subsActive.Dec() + return nil } func (es *EventStore) Append(event *fes.Event) error { @@ -116,7 +144,12 @@ func (es *EventStore) Append(event *fes.Event) error { "nats.subject": subject, }).Infof("Appending event: %v", event.Type) - return es.conn.Publish(subject, data) + err = es.conn.Publish(subject, data) + if err != nil { + return err + } + eventsAppended.Inc() + return nil } func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { diff --git a/pkg/fes/backend/nats/nats.go b/pkg/fes/backend/nats/nats.go index 15aa2f5c..d21c9edf 100644 --- a/pkg/fes/backend/nats/nats.go +++ b/pkg/fes/backend/nats/nats.go @@ -113,7 +113,11 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* if err != nil { return nil, err } - defer sub.Close() + subsActive.Inc() + defer func() { + sub.Close() + subsActive.Dec() + }() for { select { @@ -138,6 +142,7 @@ func NewWildcardConn(conn stan.Conn) *WildcardConn { func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error) { if !hasWildcard(wildcardSubject) { + subsActive.Inc() return wc.Conn.Subscribe(wildcardSubject, cb, opts...) } @@ -183,10 +188,11 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op case deleted: // Delete the current listener of the subject of the event if _, ok := ws.sources[subject]; ok { - err := ws.sources[subject].Close() + err := ws.sources[subject].Unsubscribe() if err != nil { logrus.Errorf("Failed to close (sub)listener: %v", err) } + subsActive.Dec() } default: panic(fmt.Sprintf("Unknown eventType: %v", subjectEvent)) @@ -287,6 +293,7 @@ func (ws *WildcardSub) Unsubscribe() error { err := ws.activitySub.Unsubscribe() for id, source := range ws.sources { err = source.Unsubscribe() + subsActive.Dec() delete(ws.sources, id) } return err diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index e2104337..774810b6 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -8,6 +8,7 @@ import ( "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -20,8 +21,21 @@ const ( var ( ErrNotFound = errors.New("could not find entity") + + cacheCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "fes", + Subsystem: "cache", + Name: "current_cache_counts", + Help: "The current number of entries in the caches", + }) + + // TODO add metrics: cache size, access latencies, update latencies (from event creation -> notification) ) +func init() { + prometheus.MustRegister(cacheCount) +} + // MapCache provides a simple non-preempting map-based CacheReaderWriter implementation. type MapCache struct { contents map[string]map[string]Aggregator // Map: AggregateType -> AggregateId -> entity @@ -93,8 +107,8 @@ func (rc *MapCache) Put(entity Aggregator) error { if _, ok := rc.contents[ref.Type]; !ok { rc.contents[ref.Type] = map[string]Aggregator{} } - rc.contents[ref.Type][ref.Id] = entity + cacheCount.Inc() return nil } @@ -102,6 +116,7 @@ func (rc *MapCache) Invalidate(ref *Aggregate) { rc.lock.Lock() defer rc.lock.Unlock() delete(rc.contents[ref.Type], ref.Id) + cacheCount.Dec() } func (rc *MapCache) List() []Aggregate { diff --git a/pkg/fnenv/native/native.go b/pkg/fnenv/native/native.go index 1ff0709d..ad9b18ad 100644 --- a/pkg/fnenv/native/native.go +++ b/pkg/fnenv/native/native.go @@ -4,10 +4,13 @@ package native import ( "fmt" "runtime/debug" + "time" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/golang/protobuf/ptypes" + + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) @@ -15,6 +18,40 @@ const ( Name = "native" ) +var ( + fnActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "functions_active", + Help: "Number of function executions that are currently active", + }) + + fnCount = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "functions_execution_total", + Help: "Total number of function executions", + }) + + fnResolved = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "functions_resolved_total", + Help: "Total number of function resolved", + }) + + fnExecTime = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "function_execution_time_milliseconds", + Help: "Execution time summary of the internal functions", + }) +) + +func init() { + prometheus.MustRegister(fnActive, fnResolved, fnExecTime, fnCount) +} + // An InternalFunction is a function that will be executed in the same process as the invoker. type InternalFunction interface { Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) @@ -49,13 +86,17 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca return nil, err } + timeStart := time.Now() + defer fnExecTime.Observe(float64(time.Since(timeStart))) fnId := spec.FnRef.ID fn, ok := fe.fns[fnId] if !ok { return nil, fmt.Errorf("could not resolve internal function '%s'", fnId) } - + fnActive.Inc() out, err := fn.Invoke(spec) + fnActive.Dec() + fnCount.Inc() if err != nil { log.WithFields(log.Fields{ "fnId": fnId, @@ -78,6 +119,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } func (fe *FunctionEnv) Resolve(fnName string) (string, error) { + fnResolved.Inc() _, ok := fe.fns[fnName] if !ok { return "", fmt.Errorf("could not resolve internal function '%s'", fnName)